Re: Kafka -> BigQueryIO Beam/Dataflow job ends up with weird encoding

2022-01-10 Thread Kaymak, Tobias
Hello Tobias,

Have you thought about the encoding of the String? No you have not - adding:

String input = new String(data, StandardCharsets.UTF_8);

Solved the issue for me.

Have a great 2022!

On Mon, Jan 10, 2022 at 9:19 PM Kaymak, Tobias 
wrote:

> Hello and Happy New Year!
>
> I am migrating a Java Beam pipeline from 2.27.0 to 2.34.0 and from Flink
> to Dataflow.
>
> I have unit tests for the easy ParDo transforms but along the way somehow
> my encoding gets screwed up. I replaced my JSON to TableRow step with the
> one from the official Google/Teleport repo - with no effect:
>
> // Parse the JSON into a {@link TableRow} object.
> try (InputStream inputStream =
> new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
>   row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
>
> } catch (IOException e) {
>   throw new RuntimeException("Failed to serialize json to table row: "
> + json, e);
> }
>
> The pipeline does nothing fancy except being able to read from ProtoBinary
> and ProtoJson depending on a config.
>
> My problematic result in BigQuery looks like this:
>
> Z��rich
>
> (should be Zürich).
>
> Has anyone ever encountered this problem? Does it ring a bell?
>
> Best,
> Tobias
>


Kafka -> BigQueryIO Beam/Dataflow job ends up with weird encoding

2022-01-10 Thread Kaymak, Tobias
Hello and Happy New Year!

I am migrating a Java Beam pipeline from 2.27.0 to 2.34.0 and from Flink to
Dataflow.

I have unit tests for the easy ParDo transforms but along the way somehow
my encoding gets screwed up. I replaced my JSON to TableRow step with the
one from the official Google/Teleport repo - with no effect:

// Parse the JSON into a {@link TableRow} object.
try (InputStream inputStream =
new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) {
  row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);

} catch (IOException e) {
  throw new RuntimeException("Failed to serialize json to table row: "
+ json, e);
}

The pipeline does nothing fancy except being able to read from ProtoBinary
and ProtoJson depending on a config.

My problematic result in BigQuery looks like this:

Z��rich

(should be Zürich).

Has anyone ever encountered this problem? Does it ring a bell?

Best,
Tobias


Re: Beam/Flink's netty versions seems to clash (2.32.0 / 1.13.1)

2021-09-23 Thread Kaymak, Tobias
Ok, thank you!

On Thu, Sep 23, 2021 at 6:06 PM Reuven Lax  wrote:

> The bug will cause these logs and might also cause some performance
> problems. It should no cause any data correctness through.
>
> The bug fix will be available in Beam 2.3.4, ETA probably this November.
>
> On Thu, Sep 23, 2021 at 6:22 AM Kaymak, Tobias 
> wrote:
>
>> Hi +Reuven Lax  -
>>
>> I saw your post when researching the error above:
>> https://stackoverflow.com/a/69111493 :)
>>
>> As we are on Flink 1.13.1 in the middle of a move, it would be tough to
>> have to downgrade Flink to downgrade Beam here.
>> Is the warning something of concern or can we expect the data to be
>> delivered (and ignore this warning for now, and wait for the next release
>> and then upgrade to it)?
>>
>> Best,
>> Tobi
>>
>> On Thu, Sep 23, 2021 at 2:51 PM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>> I am using the GCP IOs to connect to BigQuery and Bigtable. I seem to
>>> have improved the situation by including Bigtable and BigQuery in my POM
>>> with the exact same version as the Beam 2.32.0 SDK. However I still get a
>>> lot of these errors:
>>>
>>>  io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
>>> cleanQueue
>>>  SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=3, target=
>>> bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
>>>  Make sure to call shutdown()/shutdownNow() and wait until
>>> awaitTermination() returns true.
>>>  java.lang.RuntimeException: ManagedChannel allocation site
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:93)
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
>>> at
>>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
>>> at
>>> io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
>>> at
>>> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
>>> at
>>> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
>>> at
>>> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
>>> at
>>> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
>>> at
>>> com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
>>> at
>>> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
>>> at
>>> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
>>> at
>>> com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
>>> at
>>> com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:138)
>>> at
>>> com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
>>> at
>>> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.(BigQueryWriteClient.java:128)
>>> at
>>> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:109)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1279)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:139)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:510)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:455)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:172)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:284)
>>> at
>>> org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeFinishBundle(Unknown
>>> Source)
>>

Re: Beam/Flink's netty versions seems to clash (2.32.0 / 1.13.1)

2021-09-23 Thread Kaymak, Tobias
Hi +Reuven Lax  -

I saw your post when researching the error above:
https://stackoverflow.com/a/69111493 :)

As we are on Flink 1.13.1 in the middle of a move, it would be tough to
have to downgrade Flink to downgrade Beam here.
Is the warning something of concern or can we expect the data to be
delivered (and ignore this warning for now, and wait for the next release
and then upgrade to it)?

Best,
Tobi

On Thu, Sep 23, 2021 at 2:51 PM Kaymak, Tobias 
wrote:

> Hello,
> I am using the GCP IOs to connect to BigQuery and Bigtable. I seem to have
> improved the situation by including Bigtable and BigQuery in my POM
> with the exact same version as the Beam 2.32.0 SDK. However I still get a
> lot of these errors:
>
>  io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
> cleanQueue
>  SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=3, target=
> bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
>  Make sure to call shutdown()/shutdownNow() and wait until
> awaitTermination() returns true.
>  java.lang.RuntimeException: ManagedChannel allocation site
> at
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:93)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
> at
> io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:615)
> at
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
> at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
> at
> com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
> at
> com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
> at
> com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:138)
> at
> com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub(BigQueryWriteStubSettings.java:145)
> at
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.(BigQueryWriteClient.java:128)
> at
> com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create(BigQueryWriteClient.java:109)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient(BigQueryServicesImpl.java:1279)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800(BigQueryServicesImpl.java:139)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:510)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.(BigQueryServicesImpl.java:455)
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService(BigQueryServicesImpl.java:172)
> at
> org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.finishBundle(WriteTables.java:284)
> at
> org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn$DoFnInvoker.invokeFinishBundle(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:242)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:89)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:125)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:894)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:887)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$2(DoFnOperator.java:529)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1419)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.

Re: Beam/Flink's netty versions seems to clash (2.32.0 / 1.13.1)

2021-09-23 Thread Kaymak, Tobias
 |  +- net.bytebuddy:byte-buddy:jar:1.10.19:test
 |  +- net.bytebuddy:byte-buddy-agent:jar:1.10.19:test
 |  \- org.objenesis:objenesis:jar:3.1:test
 +- com.google.auto.value:auto-value-annotations:jar:1.6.2:compile
 +- com.google.auto.value:auto-value:jar:1.6.2:provided
 +- org.projectlombok:lombok:jar:1.16.22:compile
 +- com.jayway.jsonpath:json-path:jar:2.3.0:compile
 |  \- net.minidev:json-smart:jar:2.3:compile
 | \- net.minidev:accessors-smart:jar:1.2:compile
 |\- org.ow2.asm:asm:jar:5.0.4:compile
 +- org.apache.commons:commons-lang3:jar:3.8.1:compile
 +- com.google.cloud:google-cloud-bigquery:jar:1.127.12:compile
 |  +- com.google.api.grpc:proto-google-iam-v1:jar:1.0.12:compile
 |  +- com.google.http-client:google-http-client-gson:jar:1.39.2:compile
 |  +- com.google.cloud:google-cloud-core-http:jar:1.94.6:compile
 |  +-
com.google.http-client:google-http-client-appengine:jar:1.39.2:compile
 |  +- commons-logging:commons-logging:jar:1.2:compile
 |  +- commons-codec:commons-codec:jar:1.15:compile
 |  +- org.checkerframework:checker-compat-qual:jar:2.5.5:compile
 |  +- com.google.guava:failureaccess:jar:1.0.1:compile
 |  \-
com.google.guava:listenablefuture:jar:.0-empty-to-avoid-conflict-with-guava:compile
 +- com.google.cloud:google-cloud-bigtable:jar:1.22.0:compile
 |  +- com.google.api.grpc:grpc-google-cloud-bigtable-v2:jar:1.22.0:compile
 |  +- org.conscrypt:conscrypt-openjdk-uber:jar:2.5.1:compile
 |  +- org.codehaus.mojo:animal-sniffer-annotations:jar:1.20:runtime
 |  +- com.google.android:annotations:jar:4.1.1.4:runtime
 |  +- io.perfmark:perfmark-api:jar:0.23.0:runtime
 |  +- io.grpc:grpc-protobuf:jar:1.37.0:compile
 |  \- io.grpc:grpc-protobuf-lite:jar:1.37.0:compile
 +- org.apache.beam:beam-sdks-java-extensions-protobuf:jar:2.32.0:compile
 +- org.jetbrains:annotations:jar:20.1.0:compile
 +- com.google.cloud:google-cloud-bigtable-emulator:jar:0.128.2:test
 \- commons-io:commons-io:jar:2.6:compile

Best,
Tobi

On Thu, Sep 23, 2021 at 6:15 AM Tomo Suzuki  wrote:

> From the package name, I think you want to focus on grpc-netty-shaded
> artifact rather than netty.
>
> Stacktrace and the dependency tree would be helpful to understand why the
> discrepancy occurred.
>
> Regards,
> Tomo
>
> On Wed, Sep 22, 2021 at 12:21 Reuven Lax  wrote:
>
>> Are you using any of the GCP IOs in your pipeline?
>>
>> On Wed, Sep 22, 2021 at 9:11 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> while upgrading our cluster and pipelines to the new visions I noticed:
>>>
>>> java.lang.NoClassDefFoundError:
>>> io/grpc/netty/shaded/io/netty/channel/FailedChannelFuture
>>>
>>> so I checked the maven dependency tree and found that the:
>>>
>>>
>>> org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.32.0:compile
>>>
>>> dependency is using netty 4.1.52 - while Flink 1.13.1 seems to use
>>> 4.1.49.
>>>
>>> Is there a way to fix this?
>>>
>>> Best,
>>> Tobi
>>>
>> --
> Regards,
> Tomo
>


Beam/Flink's netty versions seems to clash (2.32.0 / 1.13.1)

2021-09-22 Thread Kaymak, Tobias
Hello,

while upgrading our cluster and pipelines to the new visions I noticed:

java.lang.NoClassDefFoundError:
io/grpc/netty/shaded/io/netty/channel/FailedChannelFuture

so I checked the maven dependency tree and found that the:

org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.32.0:compile

dependency is using netty 4.1.52 - while Flink 1.13.1 seems to use 4.1.49.

Is there a way to fix this?

Best,
Tobi


Re: Beam stuck when trying to run on remote Flink cluster

2021-02-12 Thread Kaymak, Tobias
Hey Nir,

Could you elaborate on how your setup looks? Are you using Java or Python?
Which Flink version? Is the cluster running on K8s? Do you use the portable
runner or the classic one?

On Mon, Feb 8, 2021 at 5:41 PM Joseph Zack  wrote:

> unsubscribe
>
> On Mon, Feb 8, 2021 at 5:06 AM Nir Gazit  wrote:
>
>> Hey,
>> I'm trying to run a super simple word-count example on a remote Flink
>> cluster. Running it on a local cluster works great, but when I try to
>> submit it to a remote cluster it's just stuck forever, no error or
>> anything. How can I debug this?
>>
>> Thanks!
>> Nir
>>
>
>
> --
>
> Joseph Zack
> Software Engineer   | Information Security Group   | Symantec Enterprise
> Division
> Broadcom
>
> mobile: 407.920.4930
>
> joseph.z...@broadcom.com   | broadcom.com
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.


Beam 2.27.0 - Flink 1.11.3 - watermark not progressing?

2021-02-12 Thread Kaymak, Tobias
Hello,

I am currently updating from Beam 2.25.0 to Beam 2.27.0 and from Flink
1.10.3 to Flink 1.11.3.

My pipeline does read from 2 Kafka topics and windowing them via:

Window.>into(
FixedWindows.of(Duration.standardMinutes(1)))
.withAllowedLateness(Duration.standardMinutes(30)
.discardingFiredPanes())

followed by a CoGroupByKey - when testing it I see the following behavior:

1. When starting with an offset that goes back 48 hours the pipeline
consumes everything up to the latest message before its starts the
operators after the CoGroupByKey. This seem to have caused OOM-errors.

2. This is visible in the Flink frontend like this

[image: Screenshot 2021-02-12 at 09.56.39.png]
The behavior I would expect is that as soon as the watermark passes the end
of the window and the allowed lateness it should start trickling down into
the other operators. Or is this kind of a memory and speed optimization?

Best,
Tobi


Re: Testing a DoFn with a Create.of() and a KafkaRecord

2020-12-09 Thread Kaymak, Tobias
The

  Pipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

approach works, I just forgot to generate fake headers (passed null to the
constructor). :)

On Wed, Dec 9, 2020 at 5:42 PM Kaymak, Tobias 
wrote:

> According to the documentation [0] the Create.of() works only for
> "Standard" types, but shouldn't it in theory also work for non-standard
> types when the Coder is specified?
>
> I want to test a DoFn that receives KafkaRecord as an
> input:
>
>KafkaRecord input = new KafkaRecord(topic, partition,
> offset, timestamp,
> kafkaTimestampType, null, kv);
>KafkaRecordCoder kafkaRecordCoder =
> KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
> PCollection> records =
> p.apply(
> Create.of(input).withCoder(kafkaRecordCoder));
>
> But that fails with
>
> java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
> was specified. Please set a coder by invoking Create.withCoder() explicitly
>  or a schema by invoking Create.withSchema().
>
> [..]
> Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
> to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
>   Building a Coder using a registered CoderProvider failed.
>
> However, when I register a CoderProvider for that TestPipeline object:
>
> Pipeline p = TestPipeline.create();
> p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
> KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));
>
> I get the following NPE:
>
> java.lang.NullPointerException
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
>  at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
>  at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>  at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>  at
> org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
>  at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
>  at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
>  at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
>  at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>  at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
>  (...)
>
> And when I try to set the Coder like:
>
>  p.apply(
> Create.of(input).withCoder(kafkaRecordCoder));
>
> My IDE says:
> java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
> converted to
> org.apache.beam.sdk.values.PCollection>
>
> What am I missing?
>
> [0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/
>


Testing a DoFn with a Create.of() and a KafkaRecord

2020-12-09 Thread Kaymak, Tobias
According to the documentation [0] the Create.of() works only for
"Standard" types, but shouldn't it in theory also work for non-standard
types when the Coder is specified?

I want to test a DoFn that receives KafkaRecord as an input:

   KafkaRecord input = new KafkaRecord(topic, partition,
offset, timestamp,
kafkaTimestampType, null, kv);
   KafkaRecordCoder kafkaRecordCoder =
KafkaRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
PCollection> records =
p.apply(
Create.of(input).withCoder(kafkaRecordCoder));

But that fails with

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder
was specified. Please set a coder by invoking Create.withCoder() explicitly
 or a schema by invoking Create.withSchema().

[..]
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Unable
to provide a Coder for org.apache.beam.sdk.io.kafka.KafkaRecord.
  Building a Coder using a registered CoderProvider failed.

However, when I register a CoderProvider for that TestPipeline object:

Pipeline p = TestPipeline.create();
p.getCoderRegistry().registerCoderForClass(KafkaRecord.class,
KafkaRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));

I get the following NPE:

java.lang.NullPointerException
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toIterable(KafkaRecordCoder.java:98)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:65)
 at
org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:40)
 at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
 at
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
 at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:408)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:365)
 at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:272)
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
 at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
 at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
 at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:171)
 (...)

And when I try to set the Coder like:

 p.apply(
Create.of(input).withCoder(kafkaRecordCoder));

My IDE says:
java: incompatible types: org.apache.beam.sdk.values.POutput cannot be
converted to
org.apache.beam.sdk.values.PCollection>

What am I missing?

[0] https://beam.apache.org/documentation/pipelines/test-your-pipeline/


Re: Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-12-04 Thread Kaymak, Tobias
Hi,

sorry that I did not answer for so long, I didn't have the time to
investigate further. Where in the logs would I typically find the linkage
error?
If I cancel the task in the Flink webinterface the log looks like this:


INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove job
0e766abe03329b190279b6790a8aafc7 from job leader monitoring.
INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Stopping ZooKeeperLeaderRetrievalService
/leader/0e766abe03329b190279b6790a8aafc7/job_manager_lock.
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
connection for job 0e766abe03329b190279b6790a8aafc7.
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
connection for job 0e766abe03329b190279b6790a8aafc7.
INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot
reconnect to job 0e766abe03329b190279b6790a8aafc7 because it is not
registered.
WARN io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.
dError: io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2 at
io.grpc.netty.shaded.io.netty.util.collection.IntObjectHashMap.values(IntObjectHashMap.java:221)
at
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.closeAll(EpollEventLoop.java:436)
at
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:401)
at
io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.ClassNotFoundException:
io.grpc.netty.shaded.io.netty.util.collection.IntObjectHashMap$2 at
java.net.URLClassLoader.findClass(URLClassLoader.java:382) at
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 7 more
WARN io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop -
Unexpected exception in the selector loop.

[The last message is then repeated]

On Fri, Oct 2, 2020 at 5:50 PM Luke Cwik  wrote:

> I have seen NoClassDefFoundErrors even when the class is there if there is
> an issue loading the class (usually related to JNI failing to load or a
> static block failing). Try to find the first linkage error
> (ExceptionInInitializer / UnsatisifedLinkError / ...) in the logs as it
> typically has more details as to why loading failed.
>
> On Fri, Oct 2, 2020 at 8:30 AM Tomo Suzuki  wrote:
>
>> I suspected that io.grpc:grpc-netty-shaded:jar:1.27.2 was incorrectly
>> shaded, but the JAR file contains the
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2 which is
>> reported as missing. Strange.
>>
>> suztomo-macbookpro44% jar tf grpc-netty-shaded-1.27.2.jar |grep
>> IntObjectHashMap
>> *io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2*.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$KeySet.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$MapIterator.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$MapEntry.class
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2$1.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$PrimitiveIterator.class
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap.class
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$1.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$KeySet$1.class
>>
>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$EntrySet.class
>>
>> On Fri, Oct 2, 2020 at 6:37 AM Kaymak, Tobias 
>> wrote:
>>
>>> No, that was not the case. I'm still seeing this message when canceling
>>> a pipeline. Sorry the spam.
>>>
>>> On Fri, Oct 2, 2020 at 12:22 PM Kaymak, Tobias 
>>> wrote:
>>>
>>>> I think this was caused by having the flink-runner defined twice in my
>>>> pom. Oo
>>>> (one time as defined with scope runtime, and one time without)
>>>>
>>>>
>>>> On Fri, Oct 2, 2020 at 9:38 AM Kaymak, Tobias 
>>>> wrote:
>>>>
>>>>> Sorry that I forgot to inc

Re: Beam streaming BigQueryIO pipeline on a Flink cluster misses elements

2020-11-06 Thread Kaymak, Tobias
There is no error in Flink or Beam, the two pipelines in parallel just
revealed that we had late data we were not aware of before.
As we had a strict drop policy in the second pipeline (withAllowedLateness())
we dropped these late records.
The second pipeline showed us that our mental model was wrong, after
digging into the source and looking at percentiles we found that on some
days we receive *really late *data which we need to account for upstream.

Just another awesome finding while using Beam. Kudos to the people
implementing windowing and triggers, this is amazing!

On Wed, Nov 4, 2020 at 1:36 PM Kaymak, Tobias 
wrote:

> Hello,
>
> while investigating potential benefits of switching BigQueryIO from
> FILE_LOADS to streaming inserts, I found a potential edge case that might
> be related to the way the BigQueryIO is being handled on a Flink cluster:
>
> Flink's task manager are run as pre-emptible instances in a GKE
> cluster's node pool. This means they can be terminated any time by Google,
> but will be respawned within 5 minutes or so.
>
> As the job manager is being run on a fixed node pool, in theory this means
> that a pipeline will be shortly interrupted, but resume as soon as the task
> manager is respawned.
>
> Now, with checkpointing and EXACTLY_ONCE processing enabled, comparing the
> BigQuery streaming vs. the non streaming inserts showed that the streaming
> one was missing a couple of elements, all from the same close timestamp
> range.
>
> Checking the GKE logs I saw that one task manager got respawned a couple
> of minutes earlier. There were no ERROR messages regarding streaming insert
> problems towards BigQuery so my suspicion is that the BigQuery sink somehow
> might have lost some records here.
>
> I ran the streaming-inserts pipeline again this morning, and the records
> were correctly inserted into BigQuery - none was missed like during the
> night.
>
> Any advice for me on how to dig deeper here?
>
> [Beam 2.24.0 / Flink 1.10.2]
>
> Best,
> Tobi
>
>
>
>
>
>


Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

2020-11-04 Thread Kaymak, Tobias
Hi Jan,

thank you for your response, I created a JIRA ticket
https://issues.apache.org/jira/browse/BEAM-11191



On Wed, Nov 4, 2020 at 2:17 PM Jan Lukavský  wrote:

> Hi Tobias,
>
> this looks like a bug, the clearGlobalState method has been introduced in
> 2.25.0, and it (seems to) might have issues related to rocksdb, can you
> file a Jira for that, please?
>
> Thanks,
>
>  Jan
> On 11/4/20 9:50 AM, Kaymak, Tobias wrote:
>
> When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 Docker
> image,
> the following exception is visible for the failing job on the *job
> manager*:
>
> 2020-11-04 09:27:14
> java.lang.RuntimeException: Failed to cleanup global state.
> at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
> FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.processWatermark1(DoFnOperator.java:741)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.
> DoFnOperator.processWatermark(DoFnOperator.java:713)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(
> OneInputStreamTask.java:167)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:179)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:180)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to org.apache.flink.runtime.state.VoidNamespace
> at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
> VoidNamespaceSerializer.java:32)
> at org.apache.flink.contrib.streaming.state.
> RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils
> .java:77)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
> .getKeys(RocksDBKeyedStateBackend.java:291)
> at org.apache.flink.runtime.state.AbstractKeyedStateBackend
> .applyToAllKeys(AbstractKeyedStateBackend.java:242)
> at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
> FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
> ... 17 more
> This is from the *task manager's* logs:
> 2020-11-04 08:46:31,250 WARN  org.apache.flink.runtime.taskmanager.Task
>  [] -
> BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable
> ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) ->
> BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0
>
>
>
> java.lang.RuntimeException: Failed to cleanup global state.
>
>
>   at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
>   at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
> ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]
&

Beam streaming BigQueryIO pipeline on a Flink cluster misses elements

2020-11-04 Thread Kaymak, Tobias
Hello,

while investigating potential benefits of switching BigQueryIO from
FILE_LOADS to streaming inserts, I found a potential edge case that might
be related to the way the BigQueryIO is being handled on a Flink cluster:

Flink's task manager are run as pre-emptible instances in a GKE
cluster's node pool. This means they can be terminated any time by Google,
but will be respawned within 5 minutes or so.

As the job manager is being run on a fixed node pool, in theory this means
that a pipeline will be shortly interrupted, but resume as soon as the task
manager is respawned.

Now, with checkpointing and EXACTLY_ONCE processing enabled, comparing the
BigQuery streaming vs. the non streaming inserts showed that the streaming
one was missing a couple of elements, all from the same close timestamp
range.

Checking the GKE logs I saw that one task manager got respawned a couple of
minutes earlier. There were no ERROR messages regarding streaming insert
problems towards BigQuery so my suspicion is that the BigQuery sink somehow
might have lost some records here.

I ran the streaming-inserts pipeline again this morning, and the records
were correctly inserted into BigQuery - none was missed like during the
night.

Any advice for me on how to dig deeper here?

[Beam 2.24.0 / Flink 1.10.2]

Best,
Tobi


Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

2020-11-04 Thread Kaymak, Tobias
I see the same problem for Beam 2.25.0 / Flink 1.11.1 and 1.10.2 so it
seems to be related to the upgrade to Beam 2.25.0 from 2.24.0


Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

2020-11-04 Thread Kaymak, Tobias
When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 Docker
image,
the following exception is visible for the failing job on the *job manager*:

2020-11-04 09:27:14
java.lang.RuntimeException: Failed to cleanup global state.
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
at org.apache.beam.runners.flink.translation.wrappers.streaming.
DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
at org.apache.beam.runners.flink.translation.wrappers.streaming.
DoFnOperator.processWatermark1(DoFnOperator.java:741)
at org.apache.beam.runners.flink.translation.wrappers.streaming.
DoFnOperator.processWatermark(DoFnOperator.java:713)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask
.java:167)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve
.java:179)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.inputWatermark(StatusWatermarkValve.java:101)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:180)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:351)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:566)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.flink.runtime.state.VoidNamespace
at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
VoidNamespaceSerializer.java:32)
at org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils
.writeNameSpace(RocksDBKeySerializationUtils.java:77)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
.getKeys(RocksDBKeyedStateBackend.java:291)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend
.applyToAllKeys(AbstractKeyedStateBackend.java:242)
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.
FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)
... 17 more

This is from the *task manager's* logs:

2020-11-04 08:46:31,250 WARN  org.apache.flink.runtime.taskmanager.Task
   [] -
BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable
ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) ->
BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
-> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0




java.lang.RuntimeException: Failed to cleanup global state.



  at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]

  at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]

  at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]

  at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?]

  at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

  at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

  at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
~[flink-dist_2.11-1.11.2.jar:1.11.2]

  at
org.apache.flink.streaming.run

Re: UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-06 Thread Kaymak, Tobias
Given: Two streams of events, stream A and B, each stream contains events
for a given key K. A has events a little later than stream B, but no later
than 1 minute.

Question: When two PCollections are being windowed by session windows with
a gap duration of 10 minutes and CoGroupedByKey is applied to join A with B
should the session windows then overlap?

On Mon, Oct 5, 2020 at 11:10 AM Kaymak, Tobias 
wrote:

> Hi Reuven,
> Thank you for your response.
>
> Yes, I've tested session windows with a gap of 10 minutes as I thought
> this should work in this scenario.
> However, I found that I still had articles/assets where the watermark
> might have been wrong. As I am relying on the assets being processed first
> (inserted into BigTable, followed by a fetch from BigTable for the whole
> history) I tried the following workaround that works (ran over the weekend
> for 72 hours of testing):
>
> As I'm reading from KafkaIO I am using a custom
> .withTimestampPolicyFactory(withEventTs) for the assets, in which I am
> simply setting a timestamp that is 1 minute earlier as the element's event
> timestamp (this is my allowed gap).
> The rest of the pipeline stays as-is, so the operation and logical
> overhead is kept at a minimum.
>
> On Fri, Oct 2, 2020 at 9:40 PM Reuven Lax  wrote:
>
>> Have you considered using Session windows? The window would start at the
>> timestamp of the article, and the Session gap duration would be the
>> (event-time) timeout after which you stop waiting for assets to join that
>> article.
>>
>> On Fri, Oct 2, 2020 at 3:05 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
>>> there is an example 4-6 on page 111 about custom windowing that deals with
>>> UnalignedFixedWindows:
>>>
>>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>>>
>>> Unfortunately that example is abbreviated and the full source code is
>>> not published in this repo:
>>> https://github.com/takidau/streamingbook
>>>
>>> I am joining two Kafka Streams and I am currently windowing them by
>>> fixed time intervals. However the elements in stream one ("articles") are
>>> published first, then the assets for those articles are being published in
>>> the "assets" topic. Articles event timestamps are therefore slightly before
>>> those of assets.
>>>
>>> Now when doing a CoGroupByKey this can lead to a situation where an
>>> article is not being processed together with its assets, as
>>>
>>> - the article has a timestamp of 2020-10-02T00:30:29.997Z
>>> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>>>
>>> This is a must in my pipeline as I am relying on them to be processed
>>> together - otherwise I am publishing an article without it's assets.
>>>
>>> My idea was therefore to apply UnalignedFixedWindows instead of fixed
>>> ones to the streams to circumvent this. What I am currently missing is the
>>> mergeWindows() implementation or the full source code to understand it.
>>> I am currently facing a java.lang.IllegalStateException
>>>
>>> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
>>> time 2020-10-02T09:32:03.365Z for window
>>> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>>>
>>> Which gives me the impression that I am doing something wrong or have
>>> not fully understood the custom windowing topic.
>>>
>>> Am I on the wrong track here?
>>>
>>>
>>>
>>>


Re: UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-05 Thread Kaymak, Tobias
Hi Reuven,
Thank you for your response.

Yes, I've tested session windows with a gap of 10 minutes as I thought this
should work in this scenario.
However, I found that I still had articles/assets where the watermark might
have been wrong. As I am relying on the assets being processed first
(inserted into BigTable, followed by a fetch from BigTable for the whole
history) I tried the following workaround that works (ran over the weekend
for 72 hours of testing):

As I'm reading from KafkaIO I am using a custom
.withTimestampPolicyFactory(withEventTs) for the assets, in which I am
simply setting a timestamp that is 1 minute earlier as the element's event
timestamp (this is my allowed gap).
The rest of the pipeline stays as-is, so the operation and logical overhead
is kept at a minimum.

On Fri, Oct 2, 2020 at 9:40 PM Reuven Lax  wrote:

> Have you considered using Session windows? The window would start at the
> timestamp of the article, and the Session gap duration would be the
> (event-time) timeout after which you stop waiting for assets to join that
> article.
>
> On Fri, Oct 2, 2020 at 3:05 AM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
>> there is an example 4-6 on page 111 about custom windowing that deals with
>> UnalignedFixedWindows:
>>
>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>>
>> Unfortunately that example is abbreviated and the full source code is not
>> published in this repo:
>> https://github.com/takidau/streamingbook
>>
>> I am joining two Kafka Streams and I am currently windowing them by fixed
>> time intervals. However the elements in stream one ("articles") are
>> published first, then the assets for those articles are being published in
>> the "assets" topic. Articles event timestamps are therefore slightly before
>> those of assets.
>>
>> Now when doing a CoGroupByKey this can lead to a situation where an
>> article is not being processed together with its assets, as
>>
>> - the article has a timestamp of 2020-10-02T00:30:29.997Z
>> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>>
>> This is a must in my pipeline as I am relying on them to be processed
>> together - otherwise I am publishing an article without it's assets.
>>
>> My idea was therefore to apply UnalignedFixedWindows instead of fixed
>> ones to the streams to circumvent this. What I am currently missing is the
>> mergeWindows() implementation or the full source code to understand it.
>> I am currently facing a java.lang.IllegalStateException
>>
>> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
>> time 2020-10-02T09:32:03.365Z for window
>> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>>
>> Which gives me the impression that I am doing something wrong or have not
>> fully understood the custom windowing topic.
>>
>> Am I on the wrong track here?
>>
>>
>>
>>


Re: UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-02 Thread Kaymak, Tobias
Well. Of course this is not fixing the core problem.
What I can do is extend the FixedWindows class and make sure that for my
real recorded "system latency" the values still get put into the previous
window. Or is there a smarter way to deal with this?

On Fri, Oct 2, 2020 at 4:11 PM Kaymak, Tobias 
wrote:

> This is what I came up with:
>
> https://gist.github.com/tkaymak/1f5eccf8633c18ab7f46f8ad01527630
>
> The first run looks okay (in my use case size and offset are the same),
> but I will need to add tests to prove my understanding of this.
>
> On Fri, Oct 2, 2020 at 12:05 PM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
>> there is an example 4-6 on page 111 about custom windowing that deals with
>> UnalignedFixedWindows:
>>
>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>>
>> Unfortunately that example is abbreviated and the full source code is not
>> published in this repo:
>> https://github.com/takidau/streamingbook
>>
>> I am joining two Kafka Streams and I am currently windowing them by fixed
>> time intervals. However the elements in stream one ("articles") are
>> published first, then the assets for those articles are being published in
>> the "assets" topic. Articles event timestamps are therefore slightly before
>> those of assets.
>>
>> Now when doing a CoGroupByKey this can lead to a situation where an
>> article is not being processed together with its assets, as
>>
>> - the article has a timestamp of 2020-10-02T00:30:29.997Z
>> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>>
>> This is a must in my pipeline as I am relying on them to be processed
>> together - otherwise I am publishing an article without it's assets.
>>
>> My idea was therefore to apply UnalignedFixedWindows instead of fixed
>> ones to the streams to circumvent this. What I am currently missing is the
>> mergeWindows() implementation or the full source code to understand it.
>> I am currently facing a java.lang.IllegalStateException
>>
>> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
>> time 2020-10-02T09:32:03.365Z for window
>> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>>
>> Which gives me the impression that I am doing something wrong or have not
>> fully understood the custom windowing topic.
>>
>> Am I on the wrong track here?
>>
>>
>>
>>


Re: UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-02 Thread Kaymak, Tobias
This is what I came up with:

https://gist.github.com/tkaymak/1f5eccf8633c18ab7f46f8ad01527630

The first run looks okay (in my use case size and offset are the same), but
I will need to add tests to prove my understanding of this.

On Fri, Oct 2, 2020 at 12:05 PM Kaymak, Tobias 
wrote:

> Hello,
>
> In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven
> there is an example 4-6 on page 111 about custom windowing that deals with
> UnalignedFixedWindows:
>
> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>
> Unfortunately that example is abbreviated and the full source code is not
> published in this repo:
> https://github.com/takidau/streamingbook
>
> I am joining two Kafka Streams and I am currently windowing them by fixed
> time intervals. However the elements in stream one ("articles") are
> published first, then the assets for those articles are being published in
> the "assets" topic. Articles event timestamps are therefore slightly before
> those of assets.
>
> Now when doing a CoGroupByKey this can lead to a situation where an
> article is not being processed together with its assets, as
>
> - the article has a timestamp of 2020-10-02T00:30:29.997Z
> - the assets have a timestamp of 2020-10-02T00:30:30.001Z
>
> This is a must in my pipeline as I am relying on them to be processed
> together - otherwise I am publishing an article without it's assets.
>
> My idea was therefore to apply UnalignedFixedWindows instead of fixed
> ones to the streams to circumvent this. What I am currently missing is the
> mergeWindows() implementation or the full source code to understand it. I
> am currently facing a java.lang.IllegalStateException
>
> TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
> time 2020-10-02T09:32:03.365Z for window
> [2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)
>
> Which gives me the impression that I am doing something wrong or have not
> fully understood the custom windowing topic.
>
> Am I on the wrong track here?
>
>
>
>


Re: Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-10-02 Thread Kaymak, Tobias
No, that was not the case. I'm still seeing this message when canceling a
pipeline. Sorry the spam.

On Fri, Oct 2, 2020 at 12:22 PM Kaymak, Tobias 
wrote:

> I think this was caused by having the flink-runner defined twice in my
> pom. Oo
> (one time as defined with scope runtime, and one time without)
>
>
> On Fri, Oct 2, 2020 at 9:38 AM Kaymak, Tobias 
> wrote:
>
>> Sorry that I forgot to include the versions, currently I'm on Beam 2.23.0
>> / Flink 1.10.2 - I have a test dependency for cassandra (archinnov) which
>> should *not *be available at runtime, refers to netty and is included in
>> this tree, but the other two places where I find netty is in Flink and the
>> beam-sdks-java-io-google-cloud-platform -> io.grpc:grpc-netty 1.27.2
>>
>> Stupid question: How can I check which version Flink 1.10.2 is expecting
>> in the runtime?
>>
>> output of mvn -Pflink-runner dependency:tree
>>
>> --- maven-dependency-plugin:2.8:tree (default-cli) ---
>> ch.ricardo.di:di-beam:jar:2.11.0
>> +- org.apache.beam:beam-sdks-java-core:jar:2.23.0:compile
>> |  +- org.apache.beam:beam-model-pipeline:jar:2.23.0:compile
>> |  |  +- com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
>> |  |  +- commons-logging:commons-logging:jar:1.2:compile
>> |  |  +- org.apache.logging.log4j:log4j-api:jar:2.6.2:compile
>> |  |  \- org.conscrypt:conscrypt-openjdk-uber:jar:1.3.0:compile
>> |  +- org.apache.beam:beam-model-job-management:jar:2.23.0:compile
>> |  +- org.apache.beam:beam-vendor-bytebuddy-1_10_8:jar:0.1:compile
>> |  +- org.apache.beam:beam-vendor-grpc-1_26_0:jar:0.3:compile
>> |  +- org.apache.beam:beam-vendor-guava-26_0-jre:jar:0.1:compile
>> |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
>> |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
>> |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.2:compile
>> |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.2:compile
>> |  +- org.apache.avro:avro:jar:1.8.2:compile
>> |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
>> |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
>> |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
>> |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
>> |  \- org.tukaani:xz:jar:1.8:compile
>> +-
>> org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.23.0:compile
>> |  +- org.apache.beam:beam-sdks-java-expansion-service:jar:2.23.0:compile
>> |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.23.0:compile
>> |  +-
>> org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:jar:2.23.0:compile
>> |  |  +- com.google.cloud.bigdataoss:gcsio:jar:2.1.3:compile
>> |  |  \-
>> com.google.apis:google-api-services-cloudresourcemanager:jar:v1-rev20200311-1.30.9:compile
>> |  +- com.google.cloud.bigdataoss:util:jar:2.1.3:compile
>> |  |  +- com.google.api-client:google-api-client-java6:jar:1.30.9:compile
>> |  |  +-
>> com.google.api-client:google-api-client-jackson2:jar:1.30.9:compile
>> |  |  +-
>> com.google.oauth-client:google-oauth-client-java6:jar:1.30.6:compile
>> |  |  +- com.google.flogger:google-extensions:jar:0.5.1:compile
>> |  |  |  \- com.google.flogger:flogger:jar:0.5.1:compile
>> |  |  \- com.google.flogger:flogger-system-backend:jar:0.5.1:runtime
>> |  | \- org.checkerframework:checker-compat-qual:jar:2.5.3:runtime
>> |  +- com.google.api:gax:jar:1.54.0:compile
>> |  |  \- org.threeten:threetenbp:jar:1.4.0:compile
>> |  +- com.google.api:gax-grpc:jar:1.54.0:compile
>> |  |  \- io.grpc:grpc-protobuf:jar:1.27.2:compile
>> |  | \- io.grpc:grpc-protobuf-lite:jar:1.27.2:compile
>> |  +-
>> com.google.apis:google-api-services-healthcare:jar:v1beta1-rev20200525-1.30.9:compile
>> |  +- com.google.auth:google-auth-library-credentials:jar:0.19.0:compile
>> |  +- com.google.auth:google-auth-library-oauth2-http:jar:0.19.0:compile
>> |  +-
>> com.google.cloud:google-cloud-bigquerystorage:jar:0.125.0-beta:compile
>> |  |  +-
>> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1alpha2:jar:0.90.0:compile
>> |  |  +-
>> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:jar:0.90.0:compile
>> |  |  \-
>> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1:jar:0.90.0:compile
>> |  +- com.google.cloud.bigtable:bigtable-client-core:jar:1.13.0:compile
>> |  |  +- com.google.cloud:google-cloud-bigtable:jar:1.9.1:compile
>> |  |  +- com.google.api.grpc:grpc-google-common-protos:jar:1.17.0:compile
>> |  |  +-
>> com.google.api.grpc:grpc

Re: Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-10-02 Thread Kaymak, Tobias
I think this was caused by having the flink-runner defined twice in my pom.
Oo
(one time as defined with scope runtime, and one time without)


On Fri, Oct 2, 2020 at 9:38 AM Kaymak, Tobias 
wrote:

> Sorry that I forgot to include the versions, currently I'm on Beam 2.23.0
> / Flink 1.10.2 - I have a test dependency for cassandra (archinnov) which
> should *not *be available at runtime, refers to netty and is included in
> this tree, but the other two places where I find netty is in Flink and the
> beam-sdks-java-io-google-cloud-platform -> io.grpc:grpc-netty 1.27.2
>
> Stupid question: How can I check which version Flink 1.10.2 is expecting
> in the runtime?
>
> output of mvn -Pflink-runner dependency:tree
>
> --- maven-dependency-plugin:2.8:tree (default-cli) ---
> ch.ricardo.di:di-beam:jar:2.11.0
> +- org.apache.beam:beam-sdks-java-core:jar:2.23.0:compile
> |  +- org.apache.beam:beam-model-pipeline:jar:2.23.0:compile
> |  |  +- com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
> |  |  +- commons-logging:commons-logging:jar:1.2:compile
> |  |  +- org.apache.logging.log4j:log4j-api:jar:2.6.2:compile
> |  |  \- org.conscrypt:conscrypt-openjdk-uber:jar:1.3.0:compile
> |  +- org.apache.beam:beam-model-job-management:jar:2.23.0:compile
> |  +- org.apache.beam:beam-vendor-bytebuddy-1_10_8:jar:0.1:compile
> |  +- org.apache.beam:beam-vendor-grpc-1_26_0:jar:0.3:compile
> |  +- org.apache.beam:beam-vendor-guava-26_0-jre:jar:0.1:compile
> |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
> |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
> |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.2:compile
> |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.2:compile
> |  +- org.apache.avro:avro:jar:1.8.2:compile
> |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
> |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
> |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
> |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
> |  \- org.tukaani:xz:jar:1.8:compile
> +-
> org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.23.0:compile
> |  +- org.apache.beam:beam-sdks-java-expansion-service:jar:2.23.0:compile
> |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.23.0:compile
> |  +-
> org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:jar:2.23.0:compile
> |  |  +- com.google.cloud.bigdataoss:gcsio:jar:2.1.3:compile
> |  |  \-
> com.google.apis:google-api-services-cloudresourcemanager:jar:v1-rev20200311-1.30.9:compile
> |  +- com.google.cloud.bigdataoss:util:jar:2.1.3:compile
> |  |  +- com.google.api-client:google-api-client-java6:jar:1.30.9:compile
> |  |  +-
> com.google.api-client:google-api-client-jackson2:jar:1.30.9:compile
> |  |  +-
> com.google.oauth-client:google-oauth-client-java6:jar:1.30.6:compile
> |  |  +- com.google.flogger:google-extensions:jar:0.5.1:compile
> |  |  |  \- com.google.flogger:flogger:jar:0.5.1:compile
> |  |  \- com.google.flogger:flogger-system-backend:jar:0.5.1:runtime
> |  | \- org.checkerframework:checker-compat-qual:jar:2.5.3:runtime
> |  +- com.google.api:gax:jar:1.54.0:compile
> |  |  \- org.threeten:threetenbp:jar:1.4.0:compile
> |  +- com.google.api:gax-grpc:jar:1.54.0:compile
> |  |  \- io.grpc:grpc-protobuf:jar:1.27.2:compile
> |  | \- io.grpc:grpc-protobuf-lite:jar:1.27.2:compile
> |  +-
> com.google.apis:google-api-services-healthcare:jar:v1beta1-rev20200525-1.30.9:compile
> |  +- com.google.auth:google-auth-library-credentials:jar:0.19.0:compile
> |  +- com.google.auth:google-auth-library-oauth2-http:jar:0.19.0:compile
> |  +-
> com.google.cloud:google-cloud-bigquerystorage:jar:0.125.0-beta:compile
> |  |  +-
> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1alpha2:jar:0.90.0:compile
> |  |  +-
> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:jar:0.90.0:compile
> |  |  \-
> com.google.api.grpc:proto-google-cloud-bigquerystorage-v1:jar:0.90.0:compile
> |  +- com.google.cloud.bigtable:bigtable-client-core:jar:1.13.0:compile
> |  |  +- com.google.cloud:google-cloud-bigtable:jar:1.9.1:compile
> |  |  +- com.google.api.grpc:grpc-google-common-protos:jar:1.17.0:compile
> |  |  +-
> com.google.api.grpc:grpc-google-cloud-bigtable-v2:jar:1.9.1:compile
> |  |  +-
> com.google.api.grpc:proto-google-cloud-bigtable-admin-v2:jar:1.9.1:compile
> |  |  +-
> com.google.api.grpc:grpc-google-cloud-bigtable-admin-v2:jar:1.9.1:compile
> |  |  +- com.google.api.grpc:proto-google-iam-v1:jar:0.13.0:compile
> |  |  +- io.opencensus:opencensus-contrib-grpc-util:jar:0.24.0:compile
> |  |  +- io.dropwizard.metrics:metrics-core:jar:3.2.6:compile
> |  |  \- commons-codec:commons-codec:j

UnalignedFixedWindows - how to join to streams with unaligned fixed windows / full source code from the Streaming Systems book?

2020-10-02 Thread Kaymak, Tobias
Hello,

In chapter 4 of the Streaming Systems book by Tyler, Slava and Reuven there
is an example 4-6 on page 111 about custom windowing that deals with
UnalignedFixedWindows:
https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html

Unfortunately that example is abbreviated and the full source code is not
published in this repo:
https://github.com/takidau/streamingbook

I am joining two Kafka Streams and I am currently windowing them by fixed
time intervals. However the elements in stream one ("articles") are
published first, then the assets for those articles are being published in
the "assets" topic. Articles event timestamps are therefore slightly before
those of assets.

Now when doing a CoGroupByKey this can lead to a situation where an article
is not being processed together with its assets, as

- the article has a timestamp of 2020-10-02T00:30:29.997Z
- the assets have a timestamp of 2020-10-02T00:30:30.001Z

This is a must in my pipeline as I am relying on them to be processed
together - otherwise I am publishing an article without it's assets.

My idea was therefore to apply UnalignedFixedWindows instead of fixed ones
to the streams to circumvent this. What I am currently missing is the
mergeWindows() implementation or the full source code to understand it. I
am currently facing a java.lang.IllegalStateException

TimestampCombiner moved element from 2020-10-02T09:32:36.079Z to earlier
time 2020-10-02T09:32:03.365Z for window
[2020-10-02T09:31:03.366Z..2020-10-02T09:32:03.366Z)

Which gives me the impression that I am doing something wrong or have not
fully understood the custom windowing topic.

Am I on the wrong track here?


Re: Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-10-02 Thread Kaymak, Tobias
3-1.30.9:runtime
|  +-
com.google.apis:google-api-services-dataflow:jar:v1b3-rev20200305-1.30.9:runtime
|  \-
com.google.apis:google-api-services-storage:jar:v1-rev20200226-1.30.9:compile
\- org.apache.beam:beam-runners-flink-1.10:jar:2.23.0:runtime
   +- org.apache.beam:beam-runners-core-java:jar:2.23.0:runtime
   |  \- org.apache.beam:beam-sdks-java-fn-execution:jar:2.23.0:compile
   +- org.apache.beam:beam-runners-java-fn-execution:jar:2.23.0:compile
   |  \-
org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.23.0:compile
   +- org.apache.beam:beam-runners-java-job-service:jar:2.23.0:runtime
   +- org.apache.flink:flink-clients_2.11:jar:1.10.1:runtime
   |  +- org.apache.flink:flink-optimizer_2.11:jar:1.10.1:runtime
   |  +- commons-cli:commons-cli:jar:1.3.1:runtime
   |  \- org.apache.flink:force-shading:jar:1.10.1:runtime
   +- org.apache.flink:flink-core:jar:1.10.1:runtime
   |  +- org.apache.flink:flink-annotations:jar:1.10.1:runtime
   |  +- org.apache.flink:flink-shaded-asm-7:jar:7.1-9.0:runtime
   |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:runtime
   |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:runtime
   |  \- org.apache.flink:flink-shaded-guava:jar:18.0-9.0:runtime
   +- org.apache.flink:flink-metrics-core:jar:1.10.1:runtime
   +- org.apache.flink:flink-java:jar:1.10.1:runtime
   |  \- org.apache.commons:commons-math3:jar:3.5:runtime
   +- org.apache.flink:flink-runtime_2.11:jar:1.10.1:runtime
   |  +-
org.apache.flink:flink-queryable-state-client-java:jar:1.10.1:runtime
   |  +- org.apache.flink:flink-hadoop-fs:jar:1.10.1:runtime
   |  +- org.apache.flink:flink-shaded-netty:jar:4.1.39.Final-9.0:runtime
   |  +- org.apache.flink:flink-shaded-jackson:jar:2.10.1-9.0:runtime
   |  +- org.javassist:javassist:jar:3.24.0-GA:runtime
   |  +- org.scala-lang:scala-library:jar:2.11.12:runtime
   |  +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:runtime
   |  |  +- com.typesafe:config:jar:1.3.3:runtime
   |  |  \- org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:runtime
   |  +- com.typesafe.akka:akka-stream_2.11:jar:2.5.21:runtime
   |  |  +- org.reactivestreams:reactive-streams:jar:1.0.2:runtime
   |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.3.7:runtime
   |  | \-
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.1.1:runtime
   |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.5.21:runtime
   |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.5.21:runtime
   |  +- org.clapper:grizzled-slf4j_2.11:jar:1.3.2:runtime
   |  +- com.github.scopt:scopt_2.11:jar:3.5.0:runtime
   |  \- com.twitter:chill_2.11:jar:0.7.6:runtime
   | \- com.twitter:chill-java:jar:0.7.6:runtime
   \- org.apache.flink:flink-streaming-java_2.11:jar:1.10.1:runtime


On Thu, Oct 1, 2020 at 5:23 PM Kyle Weaver  wrote:

> Can you provide your beam and flink versions as well?
>
> On Thu, Oct 1, 2020 at 5:59 AM Tomo Suzuki  wrote:
>
>> To fix the problem we need to identify which JAR file contains
>> io.grpc.netty.shaded.io.netty.util.collection.IntObjectHashMap.  Can you
>> check which version of which artifact (I suspect io.grpc:grpc-netty) has
>> the class in your runtime?
>>
>> As far as I know, Beam's vendored (shaded) class files have the package
>> name "org.apache.beam.vendor" prefix.
>>
>> On Thu, Oct 1, 2020 at 3:48 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> when deploying a Beam streaming pipeline on Flink and canceling it after
>>> some time, the following can be seen in the logs:
>>>
>>> 2020-10-01 07:36:47,605 WARN
>>>  io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop- Unexpected
>>> exception in the selector loop.
>>> flink-taskmanager-7695c66775-xtz4l taskmanager
>>> java.lang.NoClassDefFoundError:
>>> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.util.collection.IntObjectHashMap.values(IntObjectHashMap.java:221)
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.closeAll(EpollEventLoop.java:436)
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:401)
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> flink-taskmanager-7695c66775-xtz4l taskmanager  at
>>> io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.ru

Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-10-01 Thread Kaymak, Tobias
Hello,

when deploying a Beam streaming pipeline on Flink and canceling it after
some time, the following can be seen in the logs:

2020-10-01 07:36:47,605 WARN
 io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop- Unexpected
exception in the selector loop.
flink-taskmanager-7695c66775-xtz4l taskmanager
java.lang.NoClassDefFoundError:
io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.util.collection.IntObjectHashMap.values(IntObjectHashMap.java:221)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.closeAll(EpollEventLoop.java:436)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:401)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
flink-taskmanager-7695c66775-xtz4l taskmanager  at
java.lang.Thread.run(Thread.java:748)

A mvn dependency:tree reveals that

org.apache.beam:beam-sdks-java-io-google-cloud-platform

and the Flink runner itself defines netty, where Flink uses a shaded one -
if I get this right.

What is the best way to fix this?


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-08-27 Thread Kaymak, Tobias
Hi Brian,

Thank you for opening the issue! My current workaround is to generate a
BigQuery schema with helper functions I already have (since I am writing to
BigQuery in the end in my sink). I have the Beam Schema function still in
the code, but I currently don't use them as I couldn't make them work in
time for a company internal demo. (So basically following your advice
trying to avoid the ProtoMessageSchema.)

Best,
Tobi

On Wed, Aug 19, 2020 at 10:52 PM Brian Hulette  wrote:

> It looks like this is occurring because we don't actually support mixing
> SchemaProviders in nested types. The current SchemaProvider implementations
> only support nested types for homogenous types (e.g. an AutoValue with an
> AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
> the outer type (EnrichedArticle), it is also used recursively for the inner
> type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.
>
> I filed BEAM-10765 [1] to add support for inferring schemas for
> non-homogenous types, I think it's something we should be able to support.
> I know it's been a while since you reported this, have you found a
> workaround in the meantime? Your best bet may be to avoid using
> ProtoMessageSchema for the inner class for now and use the same style of
> class for the outer and inner class by just creating a POJO or AutoValue
> that replicates the ArticleEnvelope class.
>
>
> Luke: Regarding recursive schemas, Reuven and I have had some discussions
> about it offline. I think he said it should be feasible but I don't know
> much beyond that.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10765
>
> On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias 
> wrote:
>
>> I want to make my example as simple as possible while also not leaving
>> out the details that might be the reason for the error. I don't think there
>> is any recursiveness.
>> I can also share the ArticleEnvelope Protobuf file If that helps. I've
>> tried to register the ArticleEnvelope schema like this:
>>
>> TestPipeline p = TestPipeline.create();
>> TypeDescriptor
>> articleEnvelopeTypeDescriptor =
>> TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
>> Schema articleSchema =
>> new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>> SerializableFunction
>> articleEnvelopeToRow =
>> new
>> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>> SerializableFunction
>> articleEnvelopeFromRow =
>> new
>> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>
>> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
>> articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>>
>> The problem is that even when I define and register it like above, as
>> soon as I annotate the class EnrichedArticle with 
>> @DefaultSchema(JavaFieldSchema.class)
>> I get:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCode

Running a Beam batch job in Flink runner leads to a successful outcome, but Flink does not consider it "Completed"

2020-07-23 Thread Kaymak, Tobias
Hello,

After running a successful batch job on Flink 1.10.1 with Beam 2.22.0 I was
expecting to see the job in the "completed" section of the Flink
webinterface. That was not the case, the following log of the
taskmanager at DEBUG level shows that something within the shutdown of the
job might went off - it looks like this is a pure Flink issue to me:

2020-07-23 04:33:15,992 DEBUG org.apache.beam.sdk.io.cassandra.CassandraIO
 - Waiting for a batch of 100 Cassandra writes to be
executed...
2020-07-23 04:33:16,001 DEBUG org.apache.beam.sdk.io.cassandra.CassandraIO
 - Waiting for a batch of 100 Cassandra writes to be
executed...
2020-07-23 04:33:16,010 DEBUG org.apache.beam.sdk.io.cassandra.CassandraIO
 - Waiting for a batch of 100 Cassandra writes to be
executed...
2020-07-23 04:33:16,018 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  -
ReleaseOnConsumptionResultPartition
978f00b319eb407349b22999d5930c1a@a66baf0819fc88d57b303927c170
[PIPELINED, 1 subpartitions, 0 pending consumptions]: Received consumed
notification for subpartition 0.
2020-07-23 04:33:16,018 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
Received consume notification from ReleaseOnConsumptionResultPartition
978f00b319eb407349b22999d5930c1a@a66baf0819fc88d57b303927c170
[PIPELINED, 1 subpartitions, 0 pending consumptions].
2020-07-23 04:33:16,018 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  - FlatMap
(FlatMap at
BigQueryIO.TypedRead/PassThroughThenCleanup/ParMultiDo(Identity).out0)
(1/1) (a66baf0819fc88d57b303927c170): Releasing
ReleaseOnConsumptionResultPartition
978f00b319eb407349b22999d5930c1a@a66baf0819fc88d57b303927c170
[PIPELINED, 1 subpartitions, 0 pending consumptions].
2020-07-23 04:33:16,018 DEBUG
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition  -
FlatMap (FlatMap at
BigQueryIO.TypedRead/PassThroughThenCleanup/ParMultiDo(Identity).out0)
(1/1) (a66baf0819fc88d57b303927c170): Released PipelinedSubpartition#0
[number of buffers: 10653067 (349079658869 bytes), number of buffers in
backlog: 0, finished? true, read view? false].
2020-07-23 04:33:16,018 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
Released partition 978f00b319eb407349b22999d5930c1a produced by
a66baf0819fc88d57b303927c170.
2020-07-23 04:33:16,022 DEBUG com.datastax.driver.core.Connection
-
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.67.5:9042-1,
inFlight=0, closed=true] closing connection
2020-07-23 04:33:16,022 DEBUG com.datastax.driver.core.Host.STATES
 - [cluster1-dc1-service.cass-operator.svc.cluster.local/
10.70.67.5:9042]
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.67.5:9042-1,
inFlight=0, closed=true] closed, remaining = 0
2020-07-23 04:33:16,023 DEBUG com.datastax.driver.core.Connection
-
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.69.3:9042-1,
inFlight=0, closed=true] closing connection
2020-07-23 04:33:16,023 DEBUG com.datastax.driver.core.Host.STATES
 - [cluster1-dc1-service.cass-operator.svc.cluster.local/
10.70.69.3:9042]
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.69.3:9042-1,
inFlight=0, closed=true] closed, remaining = 0
2020-07-23 04:33:16,027 DEBUG com.datastax.driver.core.Connection
-
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.68.3:9042-2,
inFlight=0, closed=true] closing connection
2020-07-23 04:33:16,027 DEBUG com.datastax.driver.core.Host.STATES
 - [cluster1-dc1-service.cass-operator.svc.cluster.local/
10.70.68.3:9042]
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.68.3:9042-2,
inFlight=0, closed=true] closed, remaining = 1
2020-07-23 04:33:16,036 DEBUG com.datastax.driver.core.Cluster
 - Shutting down
2020-07-23 04:33:16,037 DEBUG com.datastax.driver.core.Connection
-
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.68.3:9042-1,
inFlight=0, closed=true] closing connection
2020-07-23 04:33:16,037 DEBUG com.datastax.driver.core.Host.STATES
 - [cluster1-dc1-service.cass-operator.svc.cluster.local/
10.70.68.3:9042]
Connection[cluster1-dc1-service.cass-operator.svc.cluster.local/10.70.68.3:9042-1,
inFlight=0, closed=true] closed, remaining = 0
2020-07-23 04:33:16,706 DEBUG com.datastax.driver.core.RequestHandler
- onTimeout triggered but the response was completed by
another thread, cancelling (retryCount = 0, queryState =
QueryState(count=0, inProgress=false, cancelled=false), queryStateRef =
QueryState(count=0, inProgress=false, cancelled=false))
2020-07-23 04:33:17,404 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 71a048c9dccd2470373e40752eacd515.
2020-07-23 04:33:17,5

Re: CassandraIO - random failures in batch mode while writing - how to recover?

2020-07-23 Thread Kaymak, Tobias
Ok so the easiest way out of this was to set the consistency level to
`LOCAL_QUORUM` in the CassandraIO, this way everything went smoothly.

On Thu, Jul 16, 2020 at 9:09 PM Kaymak, Tobias 
wrote:

> Hello,
>
> I am trying to load a table that is about 200 GiB in size in BigQuery to
> Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
> fails at random points in time throwing different errors each time - and
> not always at the same points in the data (which comes in pretty clean from
> BigQuery as far as I can see).
>
> I can see here that there seems to be an invalid query string  (log level
> is debug) - do I have a chance to set a retry strategy for the CassandraIO
> or is there another way to deal with this situation?
>
>  2020-07-16 16:22:21,926 ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in
> task code:  CHAIN MapPartition (MapPartition at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
> org.apache.beam.sdk.util.UserCodeException:
> java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
> at
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
> at
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
> at
> com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
> Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
> String didn't validate.
> at
> com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
> at
> com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
> at
> com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
> at
> com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
> at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
> at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstract

CassandraIO - random failures in batch mode while writing - how to recover?

2020-07-16 Thread Kaymak, Tobias
Hello,

I am trying to load a table that is about 200 GiB in size in BigQuery to
Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
fails at random points in time throwing different errors each time - and
not always at the same points in the data (which comes in pretty clean from
BigQuery as far as I can see).

I can see here that there seems to be an invalid query string  (log level
is debug) - do I have a chance to set a retry strategy for the CassandraIO
or is there another way to deal with this situation?

 2020-07-16 16:22:21,926 ERROR org.apache.flink.runtime.operators.BatchTask
 - Error in task code:  CHAIN MapPartition (MapPartition at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
org.apache.beam.sdk.util.UserCodeException:
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
at
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
String didn't validate.
at
com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
at
com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra

Running a batch pipeline on the Classic Java Flink Runner - pipeline starts, but shell blocks

2020-07-16 Thread Kaymak, Tobias
Hello,

I have a batch pipeline with Beam 2.22.0 reading about 200 GiB from
BigQuery, mapping the data and writing it out via CassandraIO.

When I run the pipeline via the Classic Java Flink runner on a 1.10.1 Flink
cluster I face the following issue:

When launching the pipeline via

bin/flink run -d -c test.beam.BigQueryToCassandra -j
/mnt/pipelines/beam_pipelines.jar --runner=FlinkRunner --appName=backload
--numberOfExecutionRetries=20 --executionRetryDelay=1 --project=XXX
--parallelism=1

The shell then returns after some minutes and throws:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Pipeline execution failed
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException: Pipeline execution failed
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:90)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at
ch.ricardo.di.beam.BigQueryToCassandra.main(BigQueryToCassandra.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: java.lang.RuntimeException:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:290)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:952)
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:84)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:53)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:150)
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:87)
... 16 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:947)
... 20 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.ja

Re: Registering Protobuf schema

2020-07-12 Thread Kaymak, Tobias
This sounds like it is related to the problem I'm trying to solve. (In my
case having a Java POJO containing a protobuf backed-class and trying to
generate a Beam Schema from it.)

I would be very interested to a solution to this as well :)

On Tue, Jul 7, 2020 at 2:22 PM  wrote:

> Hi All,
>
>
>
> I have a BEAM pipeline where I am reading data from some parquet files and
> converting them into a different format based on protobuf generated classes.
>
>
>
> I wish to associate a schema (derived from the protobuf classes) for my
> PCollections.  What is the appropriate way to do this with
> protobuf-generated classes?
>
>
>
> Code excerpt:
>
>
>
> PCollection result = input.apply("FXFilePattern", FileIO.*match*
> ().filepattern(fxDataFilePattern))
> .apply("FXReadMatches", FileIO.*readMatches*())
> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
> boolean hasSchema = result.hasSchema();  // returns false
>
>
>
> With thanks in advance.
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
> *Robert Butcher*
>
> *Technical Architect | Foundry/SRS | NatWest Markets*
>
> WeWork, 10 Devonshire Square, London, EC2M 4AE
>
> Mobile +44 (0) 7414 730866
>
>
>
> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>
>
>
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for distribution
> to, or use by any person or entity in any jurisdiction or country where
> such distribution or use would be contrary to local law or regulation.
> NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts
> no responsibility for any changes made to this message after it was sent.
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a product
> or service, confirmation of any transaction, a valuation, indicative price
> or an official statement. Trading desks may have a position or interest
> that is inconsistent with any views expressed in this message. In
> evaluating the information contained in this message, you should know that
> it could have been previously provided to other clients and/or internal
> NatWest Markets personnel, who could have already acted on it.
> NatWest Markets cannot provide absolute assurances that all electronic
> communications (sent or received) are secure, error free, not corrupted,
> incomplete or virus free and/or that they will not be lost, mis-delivered,
> destroyed, delayed or intercepted/decrypted by others. Therefore NatWest
> Markets disclaims all liability with regards to electronic communications
> (and the contents therein) if they are corrupted, lost destroyed, delayed,
> incomplete, mis-delivered, intercepted, decrypted or otherwise
> misappropriated by others.
> Any electronic communication that is conducted within or through NatWest
> Markets systems will be subject to being archived, monitored and produced
> to regulators and in litigation in accordance with NatWest Markets’ policy
> and local laws, rules and regulations. Unless expressly prohibited by local
> law, electronic communications may be archived in countries other than the
> country in which you are located, and may be treated in accordance with the
> laws and regulations of the country of each individual included in the
> entire chain.
> Copyright NatWest Markets Plc. All rights reserved. See
> https://www.nwm.com/disclaimer for further risk disclosure.
>


Re: Beam supports Flink Async IO operator

2020-07-09 Thread Kaymak, Tobias
Hi Eleanore,

Maybe batched RPC is what you are looking for?
https://beam.apache.org/blog/timely-processing/

On Wed, Jul 8, 2020 at 6:20 PM Eleanore Jin  wrote:

> Thanks Luke and Max for the information.
>
> We have the use case that inside a DoFn, we will need to call external
> services to trigger some other flows. The calls to other services are REST
> based sync calls, and it will take 150 milliseconds plus to return. We are
> using Flink as the runner and I came across this Async I/O operator from
> flink, trying to figure out if this is the right approach and if Beam
> provides any similar concept for it.
>
> Thanks!
> Eleanore
>
> On Wed, Jul 8, 2020 at 2:55 AM Maximilian Michels  wrote:
>
>> Just to clarify: We could make the AsnycIO operator also available in
>> Beam but the operator has to be represented by a concept in Beam.
>> Otherwise, there is no way to know when to produce it as part of the
>> translation.
>>
>> On 08.07.20 11:53, Maximilian Michels wrote:
>> > Flink's AsycIO operator is useful for processing io-bound operations,
>> > e.g. sending network requests. Like Luke mentioned, it is not available
>> > in Beam.
>> >
>> > -Max
>> >
>> > On 07.07.20 22:11, Luke Cwik wrote:
>> >> Beam is a layer that sits on top of execution engines like Flink and
>> >> provides its own programming model thus native operators like Flink's
>> >> async IO operator are not exposed.
>> >>
>> >> Most people use a DoFn to do all their IO and sometimes will compose
>> >> it with another transform such as GroupIntoBatches[1] to simplify
>> >> their implementation.
>> >>
>> >> Why do you need async?
>> >>
>> >> 1:
>> >>
>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>> >>
>> >>
>> >>
>> >> On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin > >> > wrote:
>> >>
>> >> Hi community,
>> >>
>> >> I cannot find any documentation for Beam supporting Flink async IO
>> >> operator
>> >>
>> >> (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
>>
>> >>
>> >> just wonder is this not supported right now?
>> >>
>> >> Thanks a lot!
>> >> Eleanore
>> >>
>>
>


Re: Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-09 Thread Kaymak, Tobias
break;
>> else if 
>> (value.equals(com.google.protobuf.BoolValue.getDefaultInstance())) {
>> value = false;
>> return tableRow.set(field.getName(), value);
>> } else {
>> return tableRow.set(field.getName(), ((BoolValue) 
>> value).getValue());
>> }
>> case "google.protobuf.StringValue":
>> if (field.hasDefaultValue()) {
>> break;
>> } else if (value.equals(
>> 
>> com.google.protobuf.StringValue.getDefaultInstance())) {
>> value = "";
>> return tableRow.set(field.getName(), value);
>> } else if (isUUIDField(field.getName())) {
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value,
>> x ->
>> 
>> UUIDUtil.getBase64FromUUID(
>> ((StringValue) 
>> x).getValue(;
>> } else {
>> return tableRow.set(
>> field.getName(),
>> UUIDUtil.getBase64FromUUID(
>> ((StringValue) 
>> value).getValue()));
>> }
>> } else if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(value, x -> 
>> ((StringValue) x).getValue()));
>> } else {
>> return tableRow.set(field.getName(), ((StringValue) 
>> value).getValue());
>> }
>> case "google.protobuf.BytesValue":
>> if (field.hasDefaultValue()) {
>> break;
>> } else if (value.equals(
>> 
>> com.google.protobuf.BytesValue.getDefaultInstance())) {
>> value = ByteString.EMPTY;
>> return tableRow.set(field.getName(), value);
>> } else if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value,
>> x ->
>> Base64.getEncoder()
>> .encodeToString(
>> 
>> ((BytesValue) x)
>> 
>> .getValue()
>> 
>> .toByteArray(;
>> } else {
>> return tableRow.set(
>> field.getName(),
>> Base64.getEncoder()
>> .encodeToString(
>> ((BytesValue) 
>> value).getValue().toByteArray()));
>> }
>> default:
>> if (field.isRepeated()) {
>> return tableRow.set(
>> field.getName(),
>> processRepeatedField(
>> value,
>> x ->
>> convertEventToTableRow(
>> new TableRow(), 
>> (Message) x)));
>> } else {
>> return tableRow.set(
>> field.getName(),
>> convertEventToTableRow(new TableRow(), 
>> (Message) value));
>> }
>> }
>> default:
>> throw new IllegalArgumentException(
>> field.getFullName() + " has an unsupported type " + 
>> field.getType());
>> }
>> }
>>
>>
>> On Wed, Jul 8, 2020 at 4:40 PM Kaymak, Tobias 
>> wrote:
>>
>>> As a workaround I am currently using the following code to generate a
>>> TableRow object from a Java Protobuf class - as I am facing a problem with
>>> Beam schemas (
>>> https://www.mail-archive.com/user@beam.apache.org/msg05799.html).
>>>
>>> It relies on the TableRowJsonCoder:
>>>
>>>   String json =
>>> JsonFormat.printer().omittingInsignificantWhitespace()
>>>   .preservingProtoFieldNames().print(article.toBuilder());
>>>   InputStream inputStream = new
>>> ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
>>>
>>>   TableRow tableRow = TableRowJsonCoder.of().decode(inputStream,
>>> Coder.Context.OUTER);
>>>
>>> However, the usage of Coder.Context is deprecated - I've tried to
>>> simply use the decode(), but that defaults to Context.NESTED.
>>>
>>> What is the correct way of doing this?
>>>
>>> Best,
>>> Tobi
>>>
>>


Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-08 Thread Kaymak, Tobias
As a workaround I am currently using the following code to generate a
TableRow object from a Java Protobuf class - as I am facing a problem with
Beam schemas (
https://www.mail-archive.com/user@beam.apache.org/msg05799.html).

It relies on the TableRowJsonCoder:

  String json = JsonFormat.printer().omittingInsignificantWhitespace()
  .preservingProtoFieldNames().print(article.toBuilder());
  InputStream inputStream = new
ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));

  TableRow tableRow = TableRowJsonCoder.of().decode(inputStream,
Coder.Context.OUTER);

However, the usage of Coder.Context is deprecated - I've tried to simply
use the decode(), but that defaults to Context.NESTED.

What is the correct way of doing this?

Best,
Tobi


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-30 Thread Kaymak, Tobias
 the
>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>> schemas are not allowed to be recursive, and it looks like we don't fail
>> gracefully for recursive proto definitions.
>>
>> Brian
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>
>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette 
>> wrote:
>>
>>> Hm it looks like the error is from trying to call the zero-arg
>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>> registered for ArticleEnvelope?
>>>
>>> I think maybe what's happening is Beam finds there's no schema
>>> registered for ArticleEnvelope, so it just recursively
>>> applies JavaFieldSchema, which generates code that attempts to use the
>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>> should fail earlier with a better message rather than just generating code
>>> that will try to access a private constructor, I filed a jira for this [1].
>>>
>>> I think you can get this working if you register a Schema for
>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>>   and +Alex Van Boxel   in case
>>> they have better advice), you might try just registering a provider
>>> manually when you create the pipeline, something like
>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>> new ProtoMessageSchema())`.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>
>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias 
>>> wrote:
>>>
>>>> A bit more context - I started with the Beam documentation and
>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>> dug deeper and tried to implement the methods myself.
>>>>
>>>> What I also tried is the following class definition:
>>>>
>>>> @DefaultSchema(JavaFieldSchema.class)
>>>> public class EnrichedArticle implements Serializable {
>>>>
>>>>   // ArticleEnvelope is generated from Protobuf
>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>   // Asset is a Java POJO
>>>>   @Nullable public List assets;
>>>>
>>>>   @SchemaCreate
>>>>   public EnrichedArticle() {}
>>>>
>>>>   @SchemaCreate
>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>> List assets) {
>>>> this.article = article;
>>>> this.assets = assets;
>>>>   }
>>>> }
>>>>
>>>> This throws the following exception:
>>>>
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.IllegalAccessError: tried to access method
>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>>>> from class
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>> ...
>>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>>>> from class
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>>> at
>>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>>> Source)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>>> at
>>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>>> at
>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>>> at
>>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>&

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-27 Thread Kaymak, Tobias
A bit more context - I started with the Beam documentation and
tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
dug deeper and tried to implement the methods myself.

What I also tried is the following class definition:

@DefaultSchema(JavaFieldSchema.class)
public class EnrichedArticle implements Serializable {

  // ArticleEnvelope is generated from Protobuf
  @Nullable public ArticleProto.ArticleEnvelope article;
  // Asset is a Java POJO
  @Nullable public List assets;

  @SchemaCreate
  public EnrichedArticle() {}

  @SchemaCreate
  public EnrichedArticle(ArticleProto.ArticleEnvelope article, List
assets) {
this.article = article;
this.assets = assets;
  }
}

This throws the following exception:

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
...
Caused by: java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
at
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
Source)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at
ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)


On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias 
wrote:

> Hi Brian,
>
> Thank you for your response.
>
> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
> and my constructor with a @SchemaCreate ,I get the following exception:
>
> Caused by: java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
> at
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
> Source)
>
> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
> make the fields private and generate Getters/Setters I get a StackOverflow
> error:
>
> java.lang.StackOverflowError
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
> at
> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
> at
> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
> at
> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
> at
> java.util.stream.ReferencePipeline$3$1.accept(

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-27 Thread Kaymak, Tobias
Hi Brian,

Thank you for your response.

1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class) and
my constructor with a @SchemaCreate ,I get the following exception:

Caused by: java.lang.IllegalAccessError: tried to access method
ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
from class
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
at
ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
Source)

2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
make the fields private and generate Getters/Setters I get a StackOverflow
error:

java.lang.StackOverflowError
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
at
org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
at
org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
[...]

2.1 When I make the fields public, the pipeline executes, but the
PCollection does not have a schema associated with it, which causes the
next pipeline step (BigQueryIO) to fail.

I want to try AutoValue as well, but that requires some more changes to my
code.

- I tried supplying the ProtoMessageSchema().toRowFunction
and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
pipeline
- I tried writing my own toRow/fromRow/getSchema functions for the
EnrichedArticle and supplying that to the pipeline

Where can I put the breakpoints to get a better understanding of what is
happening here?



On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette  wrote:

> Hi Tobias,
>
> You should be able to annotate the EnrichedArticle class with an4
> @DefaultSchema annotation and Beam will infer a schema for it. You would
> need to make some tweaks to the class though to be compatible with the
> built-in schema providers: you could make the members public and use
> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
> it into an AutoValue and use AutoValueSchema.
>
> Once you do that you should be able to convert a
> PCollection to a PCollection with Convert.toRows [1].
>
> Brian
>
> [1]
> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>
> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias 
> wrote:
>
>> I have the following class definition:
>>
>> public class EnrichedArticle implements Serializable {
>>
>>   // ArticleEnvelope is generated via Protobuf
>>   private ArticleProto.ArticleEnvelope article;
>>   // Asset is a Java POJO
>>   private List assets;
>>
>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>> List assets) {
>> this.article = article;
>> this.assets = assets;
>>   }
>> }
>>
>> I am trying to generate a SerializableFunction and
>> a Schema for it so that I can pass it easily to my BigQueryIO at the end of
>> my pipeline. Transforming the article to a Row object is straightforward:
>>
>> First I get the toRow() function for it via the helper:
>>
>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>   ArticleProto.ArticleEnvelope.class));
>>
>> Then I just apply that function to the article field.
>> However I don't know how I can manually transform my list of assets (a
>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>>
>> in my EnrichedArticle container/composition class. What's the recommended
>> way of doing this?
>>
>>
>>
>>


[Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-26 Thread Kaymak, Tobias
I have the following class definition:

public class EnrichedArticle implements Serializable {

  // ArticleEnvelope is generated via Protobuf
  private ArticleProto.ArticleEnvelope article;
  // Asset is a Java POJO
  private List assets;

  public EnrichedArticle(ArticleProto.ArticleEnvelope article, List
assets) {
this.article = article;
this.assets = assets;
  }
}

I am trying to generate a SerializableFunction and a
Schema for it so that I can pass it easily to my BigQueryIO at the end of
my pipeline. Transforming the article to a Row object is straightforward:

First I get the toRow() function for it via the helper:

 new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
  ArticleProto.ArticleEnvelope.class));

Then I just apply that function to the article field.
However I don't know how I can manually transform my list of assets (a
simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)

in my EnrichedArticle container/composition class. What's the recommended
way of doing this?


Enriching a stream by looking up from a huge table (2 TiB)+

2020-05-13 Thread Kaymak, Tobias
Hi,

First of all thank you for the Webinar Beam Sessions this month. They are
super helpful especially for getting people excited and on-boarded with
Beam!

We are currently trying to promote Beam with more use cases within our
company and tackling a problem, where we have to join a stream of articles
with asset-information. The asset information as a table has a size of 2
TiB+ and therefore, we think the only way to enrich the stream would be by
having it in a fast lookup store, so that the (batched) RPC pattern could
be applied. (So in product terms of Google Cloud having it in something
like BigTable or a similar fast and big key/value store.)

Is there an alternative that we could try? Maintaining that additional data
store would add overhead we are looking to avoid. :)

Best,
Tobi


Re: Hello Beam Community!

2020-03-17 Thread Kaymak, Tobias
Welcome Brittany! :)

On Fri, Mar 13, 2020 at 7:30 PM Robert Bradshaw  wrote:

> Welcome!
>
> On Fri, Mar 13, 2020 at 7:41 AM Ismaël Mejía  wrote:
>
>> Welcome !
>>
>> On Fri, Mar 13, 2020 at 3:00 PM Connell O'Callaghan 
>> wrote:
>>
>>> Welcome Brittany
>>>
>>> On Fri, Mar 13, 2020 at 6:45 AM Rustam Mehmandarov <
>>> mehmanda...@gmail.com> wrote:
>>>
 Welcome Brittany!

 Cheers,
 Rustam
 @rmehmandarov 

 On Fri, Mar 13, 2020 at 2:31 AM Brittany Hermann 
 wrote:

> Hello Beam Community!
>
> My name is Brittany Hermann and I recently joined the Open Source team
> in Data Analytics at Google. As a Program Manager, I will be focusing on
> community engagement while getting to work on Apache Beam and Airflow
> projects! I have always thrived on creating healthy, diverse, and overall
> happy communities and am excited to bring that to the team. For a fun 
> fact,
> I am a big Wisconsin Badgers Football fan and have a goldendoodle puppy
> named Ollie!
>
> I look forward to collaborating with you all!
>
> Kind regards,
>
> Brittany Hermann
>
> --
>
> Brittany Hermann
>
> Open Source Program Manager (Provided by Adecco Staffing)
>
> 1190 Bordeaux Drive , Building 4, Sunnyvale, CA 94089
> 
>
>
>


Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-03-02 Thread Kaymak, Tobias
Good morning Max and thanks for clarifying!

I generated the JAR 2.19.0 in the second test via the default demo code
from Beam. There were no further adjustments from my side, but as I can see
there are some open points in JIRA for 1.9.2, so for now I think that we
can focus on 1.9.1 as a target.

To understand the fail-safety paradigm correctly - It is my understanding
that, when I hit stop my pipeline (reading from Kafka, writing to BigQuery)
will stop consuming from Kafka and wait until all bundles have been written
out to BigQuery. Meaning that it will also correctly persist the offset of
the message it has successfully read in Kafka. When it gets started again
it will resume at that offset then.

In contrast, hitting cancel causes my pipeline to drop everything it is
doing immediately and leaving me (without taking a snapshot) in an unclear
state.

For operations it is useful to have a cancel button, to be able to quickly
shutdown a pipeline and completely re-read it from a streaming source. For
example when there was a critical bug in the pipeline processing code, or
when a team, that was sending data through that source happened to release
a feature earlier than announced and some data had been missed, as the
schema had not been updated in the pipeline before.

On the other hand, it is useful to have a stop button, to be able to
cleanly shutdown a pipeline when a feature is going to be released
tomorrow, and the pipeline processing code should be updated before,
without the overhead of re-reading the complete past. Taking a snapshot
uses the current representation of the Beam code as a Flink job, but what
if one wants to update the Beam code and thus the pipeline code without the
need to reprocess the whole history? Moreover, a stop button is very useful
when a new Flink version is going to be rolled out: then one can drain all
pipelines (for us there are right now 31), rollout a new Flink version and
start them at the point where they left of with their last committed offset
in Kafka.

Does that make sense?

Best,
Tobi

On Sun, Mar 1, 2020 at 5:23 PM Maximilian Michels  wrote:

> In some sense, stop is different because sources will be stopped first
> and then all the downstream operators will stop automatically. However,
> in terms of correctness of your program using cancel or stop does not
> make a difference because neither approach takes a checkpoint.
>
> Only at the time of a checkpoint you are guranteed to have a consistent
> state. A checkpoint is the only way to resume a canceled/stopped
> pipeline correctly.
>
> Does that make sense? I understand that stop may have been convenient
> for your use case. If so, we may consider adding it again.
>
> Cheers,
> Max
>
> PS: Concerning the web interface with 1.9.2, I'm not sure what changes
> your Jar contain but we'll have to look into this when we upgrade to
> 1.9.2 in Beam.
>
> On 28.02.20 14:59, Kaymak, Tobias wrote:
> > I investigated further:
> >
> > As Flink 1.9.1 works and Flink 1.9.2 does not, I simply tried a rollout
> > of a vanilla flink:1.9.2-scala_2.11 image to K8s and that worked. So the
> > issue must be in my image or the JAR I am attaching:
> >
> > ARG FLINK_VERSION=1.9.2
> > ARG SCALA_VERSION=2.11
> > FROM flink:${FLINK_VERSION}-scala_${SCALA_VERSION}
> >
> > COPY --chown=flink:flink conf/log4j-console.properties
> > /opt/flink/conf/log4j-console.properties
> > ADD --chown=flink:flink
> >
> https://repo1.maven.org/maven2/org/apache/flink/flink-metrics-prometheus_${SCALA_VERSION}/${FLINK_VERSION}/flink-metrics-prometheus_${SCALA_VERSION}-${FLINK_VERSION}.jar
> >
> /opt/flink/lib/flink-metrics-prometheus_${SCALA_VERSION}-${FLINK_VERSION}.jar
> > ADD --chown=flink:flink
> >
> https://repo1.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_${SCALA_VERSION}/${FLINK_VERSION}/flink-statebackend-rocksdb_${SCALA_VERSION}-${FLINK_VERSION}.jar
> >
> /opt/flink/lib/flink-statebackend-rocksdb_${SCALA_VERSION}-${FLINK_VERSION}.jar
> > ADD --chown=flink:flink deployment/run.sh /opt/flink/run.sh
> > RUN chmod +x /opt/flink/run.sh
> >
> > COPY --from=builder --chown=flink:flink
> > /build/target/di-beam-bundled.jar /opt/flink/lib/beam_pipelines.jar
> >
> > Commenting out the last COPY step - to circumvent the addition of the
> > fat Beam JAR - did the trick. So I think my beam JAR contains something
> > Flink does not like. Next thing I tried was building a vanilla Beam JAR
> via:
> >
> > mvn archetype:generate \
> >-DarchetypeGroupId=org.apache.beam \
> >-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
> >-DarchetypeVersion=2.19.0 \
> >-DgroupId=org.example \
> >-DartifactId=word-count-beam \
&

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-27 Thread Kaymak, Tobias
What I found so far is that the "Stop" Button next to the "Cancel" button
is missing when I run my Beam 2.19.0/2.20.0-SNAPSHOT streaming pipeline in
Flink's 1.9.1's web interface. I couldn't figure out yet if it has been
removed by the Flink team on purpose or if that is something "missing" in
the Beam translation layer.

Best,
Tobias

On Thu, Feb 27, 2020 at 1:44 PM Ismaël Mejía  wrote:

> Both Flink 1.9.2 and 1.10.0 are not supported yet on Beam, probably they
> will be part of the 2.21.0 release
> You can follow the progress on both issues (and help us with early testing
> once in master):
>
> BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible with
> Flink 1.10
> https://issues.apache.org/jira/browse/BEAM-9295
>
> BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
> https://issues.apache.org/jira/browse/BEAM-9299
>
> Regards,
> Ismaël
>
>
> On Thu, Feb 27, 2020 at 11:53 AM Kaymak, Tobias 
> wrote:
>
>> Copy paste error, sorry:
>>
>> 2.20.0-SNAPSHOT in combination with beam-runners-flink-1.10
>> or beam-runners-flink-1.10-SNAPSHOT didn't work either for me.
>>
>>
>> On Thu, Feb 27, 2020 at 11:50 AM Kaymak, Tobias 
>> wrote:
>>
>>> I can confirm that the pipeline behaves as expected with 2.20.0-SNAPSHOT
>>> and Flink 1.9.1 - I also tried Flink 1.9.2 but the webinterface didn't show
>>> up (just a blank page - javascript was being loaded though).
>>> I emptied my cache and investigated the log and asked on the Flink
>>> mailing list if this is known - maybe it's also because of one of the
>>> dependencies in my fat Beam jar. I am still investigating this.
>>>
>>> How can I test the Flink 1.10 runners? (The following POM is not
>>> resolvable by maven)
>>>
>>>
>>> org.apache.beam
>>> beam-runners-flink-1.10
>>> 2.20-SNAPSHOT
>>> 
>>>
>>> Best,
>>> Tobi
>>>
>>> On Wed, Feb 26, 2020 at 5:07 PM Ismaël Mejía  wrote:
>>>
>>>> Since it was merged yesterday you can test with the 2.20.0-SNAPSHOT
>>>> until the first candidate is out.
>>>>
>>>> On Wed, Feb 26, 2020 at 4:37 PM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> If I am not running in detached mode (so that my pipeline starts) I am
>>>>> unable to Stop it in the webinterface. The only option available is to
>>>>> cancel it. Is this expected?
>>>>>
>>>>> [image: Screenshot 2020-02-26 at 16.34.08.png]
>>>>>
>>>>> On Wed, Feb 26, 2020 at 4:16 PM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> we fixed the issue and are ready to test :) - is there a RC already
>>>>>> available?
>>>>>>
>>>>>> Best,
>>>>>> Tobi
>>>>>>
>>>>>> On Wed, Feb 26, 2020 at 12:59 PM Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> happy to help testing! I am currently fixing a networking issue
>>>>>>> between our dev cluster for integration tests and the Kafka it is 
>>>>>>> consuming
>>>>>>> from.
>>>>>>> After that I would be ready to spin it up and test
>>>>>>>
>>>>>>> Best,
>>>>>>> Tobi
>>>>>>>
>>>>>>> On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank you for reporting / filing / collecting the issues.
>>>>>>>>
>>>>>>>> There is a fix pending: https://github.com/apache/beam/pull/10950
>>>>>>>>
>>>>>>>> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I
>>>>>>>> will
>>>>>>>> check out the Flink 1.10 PR tomorrow.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On 24.02.20 09:26, Ismaël Mejía wrote:
>>>>>>>> > We are cutting the release branch for 2.20.0 next wednesday, so
>>

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-27 Thread Kaymak, Tobias
Copy paste error, sorry:

2.20.0-SNAPSHOT in combination with beam-runners-flink-1.10
or beam-runners-flink-1.10-SNAPSHOT didn't work either for me.


On Thu, Feb 27, 2020 at 11:50 AM Kaymak, Tobias 
wrote:

> I can confirm that the pipeline behaves as expected with 2.20.0-SNAPSHOT
> and Flink 1.9.1 - I also tried Flink 1.9.2 but the webinterface didn't show
> up (just a blank page - javascript was being loaded though).
> I emptied my cache and investigated the log and asked on the Flink mailing
> list if this is known - maybe it's also because of one of the
> dependencies in my fat Beam jar. I am still investigating this.
>
> How can I test the Flink 1.10 runners? (The following POM is not
> resolvable by maven)
>
>
> org.apache.beam
> beam-runners-flink-1.10
> 2.20-SNAPSHOT
> 
>
> Best,
> Tobi
>
> On Wed, Feb 26, 2020 at 5:07 PM Ismaël Mejía  wrote:
>
>> Since it was merged yesterday you can test with the 2.20.0-SNAPSHOT until
>> the first candidate is out.
>>
>> On Wed, Feb 26, 2020 at 4:37 PM Kaymak, Tobias 
>> wrote:
>>
>>> If I am not running in detached mode (so that my pipeline starts) I am
>>> unable to Stop it in the webinterface. The only option available is to
>>> cancel it. Is this expected?
>>>
>>> [image: Screenshot 2020-02-26 at 16.34.08.png]
>>>
>>> On Wed, Feb 26, 2020 at 4:16 PM Kaymak, Tobias 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> we fixed the issue and are ready to test :) - is there a RC already
>>>> available?
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> On Wed, Feb 26, 2020 at 12:59 PM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> happy to help testing! I am currently fixing a networking issue
>>>>> between our dev cluster for integration tests and the Kafka it is 
>>>>> consuming
>>>>> from.
>>>>> After that I would be ready to spin it up and test
>>>>>
>>>>> Best,
>>>>> Tobi
>>>>>
>>>>> On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels 
>>>>> wrote:
>>>>>
>>>>>> Thank you for reporting / filing / collecting the issues.
>>>>>>
>>>>>> There is a fix pending: https://github.com/apache/beam/pull/10950
>>>>>>
>>>>>> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will
>>>>>> check out the Flink 1.10 PR tomorrow.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On 24.02.20 09:26, Ismaël Mejía wrote:
>>>>>> > We are cutting the release branch for 2.20.0 next wednesday, so not
>>>>>> sure
>>>>>> > if these tickets will make it, but hopefully.
>>>>>> >
>>>>>> > For ref,
>>>>>> > BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner
>>>>>> compatible
>>>>>> > with Flink 1.10
>>>>>> > BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
>>>>>> >
>>>>>> > In any case if you have cycles to help test any of the related
>>>>>> tickets
>>>>>> > PRs that would help too.
>>>>>> >
>>>>>> >
>>>>>> > On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch
>>>>>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>>>>>> >
>>>>>> > Hi Kyle,
>>>>>> >
>>>>>> > thank you for creating the JIRA ticket, I think my best option
>>>>>> right
>>>>>> > now is to wait for a Beam version that is running on Flink 1.10
>>>>>> then
>>>>>> > - unless there is a new Beam release around the corner :)
>>>>>> >
>>>>>> > Best,
>>>>>> > Tobi
>>>>>> >
>>>>>> > On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver <
>>>>>> kcwea...@google.com
>>>>>> > <mailto:kcwea...@google.com>> wrote:
>>>>>> >
>>>>>> > Hi To

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-27 Thread Kaymak, Tobias
I can confirm that the pipeline behaves as expected with 2.20.0-SNAPSHOT
and Flink 1.9.1 - I also tried Flink 1.9.2 but the webinterface didn't show
up (just a blank page - javascript was being loaded though).
I emptied my cache and investigated the log and asked on the Flink mailing
list if this is known - maybe it's also because of one of the
dependencies in my fat Beam jar. I am still investigating this.

How can I test the Flink 1.10 runners? (The following POM is not resolvable
by maven)

   
org.apache.beam
beam-runners-flink-1.10
2.20-SNAPSHOT


Best,
Tobi

On Wed, Feb 26, 2020 at 5:07 PM Ismaël Mejía  wrote:

> Since it was merged yesterday you can test with the 2.20.0-SNAPSHOT until
> the first candidate is out.
>
> On Wed, Feb 26, 2020 at 4:37 PM Kaymak, Tobias 
> wrote:
>
>> If I am not running in detached mode (so that my pipeline starts) I am
>> unable to Stop it in the webinterface. The only option available is to
>> cancel it. Is this expected?
>>
>> [image: Screenshot 2020-02-26 at 16.34.08.png]
>>
>> On Wed, Feb 26, 2020 at 4:16 PM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> we fixed the issue and are ready to test :) - is there a RC already
>>> available?
>>>
>>> Best,
>>> Tobi
>>>
>>> On Wed, Feb 26, 2020 at 12:59 PM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>
>>>> Hello,
>>>>
>>>> happy to help testing! I am currently fixing a networking issue between
>>>> our dev cluster for integration tests and the Kafka it is consuming from.
>>>> After that I would be ready to spin it up and test
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels 
>>>> wrote:
>>>>
>>>>> Thank you for reporting / filing / collecting the issues.
>>>>>
>>>>> There is a fix pending: https://github.com/apache/beam/pull/10950
>>>>>
>>>>> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will
>>>>> check out the Flink 1.10 PR tomorrow.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 24.02.20 09:26, Ismaël Mejía wrote:
>>>>> > We are cutting the release branch for 2.20.0 next wednesday, so not
>>>>> sure
>>>>> > if these tickets will make it, but hopefully.
>>>>> >
>>>>> > For ref,
>>>>> > BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner
>>>>> compatible
>>>>> > with Flink 1.10
>>>>> > BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
>>>>> >
>>>>> > In any case if you have cycles to help test any of the related
>>>>> tickets
>>>>> > PRs that would help too.
>>>>> >
>>>>> >
>>>>> > On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch
>>>>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>>>>> >
>>>>> > Hi Kyle,
>>>>> >
>>>>> > thank you for creating the JIRA ticket, I think my best option
>>>>> right
>>>>> > now is to wait for a Beam version that is running on Flink 1.10
>>>>> then
>>>>> > - unless there is a new Beam release around the corner :)
>>>>> >
>>>>> > Best,
>>>>> > Tobi
>>>>> >
>>>>> > On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver <
>>>>> kcwea...@google.com
>>>>> > <mailto:kcwea...@google.com>> wrote:
>>>>> >
>>>>> > Hi Tobi,
>>>>> >
>>>>> > This seems like a bug with Beam 2.19. I filed
>>>>> > https://issues.apache.org/jira/browse/BEAM-9345 to track
>>>>> the issue.
>>>>> >
>>>>> >  > What puzzles me is that the session cluster should be
>>>>> allowed
>>>>> > to have multiple environments in detached mode - or am I
>>>>> wrong?
>>>>> >
>>>>> > It looks like that check is removed in Flink 1.10:
>>>>> > https://issues.apache.

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-26 Thread Kaymak, Tobias
If I am not running in detached mode (so that my pipeline starts) I am
unable to Stop it in the webinterface. The only option available is to
cancel it. Is this expected?

[image: Screenshot 2020-02-26 at 16.34.08.png]

On Wed, Feb 26, 2020 at 4:16 PM Kaymak, Tobias 
wrote:

> Hello,
>
> we fixed the issue and are ready to test :) - is there a RC already
> available?
>
> Best,
> Tobi
>
> On Wed, Feb 26, 2020 at 12:59 PM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> happy to help testing! I am currently fixing a networking issue between
>> our dev cluster for integration tests and the Kafka it is consuming from.
>> After that I would be ready to spin it up and test
>>
>> Best,
>> Tobi
>>
>> On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels 
>> wrote:
>>
>>> Thank you for reporting / filing / collecting the issues.
>>>
>>> There is a fix pending: https://github.com/apache/beam/pull/10950
>>>
>>> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will
>>> check out the Flink 1.10 PR tomorrow.
>>>
>>> Cheers,
>>> Max
>>>
>>> On 24.02.20 09:26, Ismaël Mejía wrote:
>>> > We are cutting the release branch for 2.20.0 next wednesday, so not
>>> sure
>>> > if these tickets will make it, but hopefully.
>>> >
>>> > For ref,
>>> > BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible
>>> > with Flink 1.10
>>> > BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
>>> >
>>> > In any case if you have cycles to help test any of the related
>>> tickets
>>> > PRs that would help too.
>>> >
>>> >
>>> > On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch
>>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>>> >
>>> > Hi Kyle,
>>> >
>>> > thank you for creating the JIRA ticket, I think my best option
>>> right
>>> > now is to wait for a Beam version that is running on Flink 1.10
>>> then
>>> > - unless there is a new Beam release around the corner :)
>>> >
>>> > Best,
>>> > Tobi
>>> >
>>> > On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver >> > <mailto:kcwea...@google.com>> wrote:
>>> >
>>> > Hi Tobi,
>>> >
>>> > This seems like a bug with Beam 2.19. I filed
>>> > https://issues.apache.org/jira/browse/BEAM-9345 to track the
>>> issue.
>>> >
>>> >  > What puzzles me is that the session cluster should be
>>> allowed
>>> > to have multiple environments in detached mode - or am I wrong?
>>> >
>>> > It looks like that check is removed in Flink 1.10:
>>> > https://issues.apache.org/jira/browse/FLINK-15201
>>> >
>>> > Thanks for reporting.
>>> > Kyle
>>> >
>>> > On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias
>>> > mailto:tobias.kay...@ricardo.ch>>
>>> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I am trying to upgrade from a Flink session cluster 1.8 to
>>> > 1.9 and from Beam 2.16.0 to 2.19.0.
>>> > Everything went quite smoothly, the local runner and the
>>> > local Flink runner work flawlessly.
>>> >
>>> > However when I:
>>> >1. Generate a Beam jar for the FlinkRunner via maven
>>> (mvn
>>> > package -PFlinkRunner)
>>> >2. Glue that into a Flink 1.9 docker image
>>> >3. Start the image as a Standalone Session Cluster
>>> >
>>> > When I try to launch the first pipeline I get the following
>>> > exception
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException:
>>> > The main method caused an error: Failed to construct
>>> > instance from factory method
>>> > FlinkRunner#fromOptions(interface
>>> > org.apache.beam.sdk.options.PipelineOptions)
>>> >  at
>>> >
>>>  
>>> org.apache.flink.client.program.PackagedPr

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-26 Thread Kaymak, Tobias
Hello,

we fixed the issue and are ready to test :) - is there a RC already
available?

Best,
Tobi

On Wed, Feb 26, 2020 at 12:59 PM Kaymak, Tobias 
wrote:

> Hello,
>
> happy to help testing! I am currently fixing a networking issue between
> our dev cluster for integration tests and the Kafka it is consuming from.
> After that I would be ready to spin it up and test
>
> Best,
> Tobi
>
> On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels 
> wrote:
>
>> Thank you for reporting / filing / collecting the issues.
>>
>> There is a fix pending: https://github.com/apache/beam/pull/10950
>>
>> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will
>> check out the Flink 1.10 PR tomorrow.
>>
>> Cheers,
>> Max
>>
>> On 24.02.20 09:26, Ismaël Mejía wrote:
>> > We are cutting the release branch for 2.20.0 next wednesday, so not
>> sure
>> > if these tickets will make it, but hopefully.
>> >
>> > For ref,
>> > BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible
>> > with Flink 1.10
>> > BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
>> >
>> > In any case if you have cycles to help test any of the related tickets
>> > PRs that would help too.
>> >
>> >
>> > On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias <
>> tobias.kay...@ricardo.ch
>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>> >
>> > Hi Kyle,
>> >
>> > thank you for creating the JIRA ticket, I think my best option right
>> > now is to wait for a Beam version that is running on Flink 1.10 then
>> > - unless there is a new Beam release around the corner :)
>> >
>> > Best,
>> > Tobi
>> >
>> > On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver > > <mailto:kcwea...@google.com>> wrote:
>> >
>> > Hi Tobi,
>> >
>> > This seems like a bug with Beam 2.19. I filed
>> > https://issues.apache.org/jira/browse/BEAM-9345 to track the
>> issue.
>> >
>> >  > What puzzles me is that the session cluster should be allowed
>> > to have multiple environments in detached mode - or am I wrong?
>> >
>> > It looks like that check is removed in Flink 1.10:
>> > https://issues.apache.org/jira/browse/FLINK-15201
>> >
>> > Thanks for reporting.
>> > Kyle
>> >
>> > On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias
>> > mailto:tobias.kay...@ricardo.ch>>
>> wrote:
>> >
>> > Hello,
>> >
>> > I am trying to upgrade from a Flink session cluster 1.8 to
>> > 1.9 and from Beam 2.16.0 to 2.19.0.
>> > Everything went quite smoothly, the local runner and the
>> > local Flink runner work flawlessly.
>> >
>> > However when I:
>> >1. Generate a Beam jar for the FlinkRunner via maven (mvn
>> > package -PFlinkRunner)
>> >2. Glue that into a Flink 1.9 docker image
>> >3. Start the image as a Standalone Session Cluster
>> >
>> > When I try to launch the first pipeline I get the following
>> > exception
>> >
>> > org.apache.flink.client.program.ProgramInvocationException:
>> > The main method caused an error: Failed to construct
>> > instance from factory method
>> > FlinkRunner#fromOptions(interface
>> > org.apache.beam.sdk.options.PipelineOptions)
>> >  at
>> >
>>  
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> >  at
>> >
>>  
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> >  at
>> >
>>  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> >  at
>> >
>>  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> >  at
>> >
>>  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>> >  at
>> >
>>  org.apache.flink.client.cli.CliFrontend.run

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-26 Thread Kaymak, Tobias
Hello,

happy to help testing! I am currently fixing a networking issue between our
dev cluster for integration tests and the Kafka it is consuming from.
After that I would be ready to spin it up and test

Best,
Tobi

On Mon, Feb 24, 2020 at 10:13 PM Maximilian Michels  wrote:

> Thank you for reporting / filing / collecting the issues.
>
> There is a fix pending: https://github.com/apache/beam/pull/10950
>
> As for the upgrade issues, the 1.8 and 1.9 upgrade is trivial. I will
> check out the Flink 1.10 PR tomorrow.
>
> Cheers,
> Max
>
> On 24.02.20 09:26, Ismaël Mejía wrote:
> > We are cutting the release branch for 2.20.0 next wednesday, so not sure
> > if these tickets will make it, but hopefully.
> >
> > For ref,
> > BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible
> > with Flink 1.10
> > BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2
> >
> > In any case if you have cycles to help test any of the related tickets
> > PRs that would help too.
> >
> >
> > On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias  > <mailto:tobias.kay...@ricardo.ch>> wrote:
> >
> > Hi Kyle,
> >
> > thank you for creating the JIRA ticket, I think my best option right
> > now is to wait for a Beam version that is running on Flink 1.10 then
> > - unless there is a new Beam release around the corner :)
> >
> > Best,
> > Tobi
> >
> > On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver  > <mailto:kcwea...@google.com>> wrote:
> >
> > Hi Tobi,
> >
> > This seems like a bug with Beam 2.19. I filed
> > https://issues.apache.org/jira/browse/BEAM-9345 to track the
> issue.
> >
> >  > What puzzles me is that the session cluster should be allowed
> > to have multiple environments in detached mode - or am I wrong?
> >
> > It looks like that check is removed in Flink 1.10:
> > https://issues.apache.org/jira/browse/FLINK-15201
> >
> > Thanks for reporting.
> > Kyle
> >
> > On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias
> > mailto:tobias.kay...@ricardo.ch>>
> wrote:
> >
> > Hello,
> >
> > I am trying to upgrade from a Flink session cluster 1.8 to
> > 1.9 and from Beam 2.16.0 to 2.19.0.
> > Everything went quite smoothly, the local runner and the
> > local Flink runner work flawlessly.
> >
> > However when I:
> >1. Generate a Beam jar for the FlinkRunner via maven (mvn
> > package -PFlinkRunner)
> >2. Glue that into a Flink 1.9 docker image
> >3. Start the image as a Standalone Session Cluster
> >
> > When I try to launch the first pipeline I get the following
> > exception
> >
> > org.apache.flink.client.program.ProgramInvocationException:
> > The main method caused an error: Failed to construct
> > instance from factory method
> > FlinkRunner#fromOptions(interface
> > org.apache.beam.sdk.options.PipelineOptions)
> >  at
> >
>  
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> >  at
> >
>  
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> >  at
> >
>  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> >  at
> >
>  
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> >  at
> >
>  
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > Caused by: java.lang.RuntimeException: Failed to construct
> > instance from factory method
> >

Re: Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-23 Thread Kaymak, Tobias
Hi Kyle,

thank you for creating the JIRA ticket, I think my best option right now is
to wait for a Beam version that is running on Flink 1.10 then - unless
there is a new Beam release around the corner :)

Best,
Tobi

On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver  wrote:

> Hi Tobi,
>
> This seems like a bug with Beam 2.19. I filed
> https://issues.apache.org/jira/browse/BEAM-9345 to track the issue.
>
> > What puzzles me is that the session cluster should be allowed to have
> multiple environments in detached mode - or am I wrong?
>
> It looks like that check is removed in Flink 1.10:
> https://issues.apache.org/jira/browse/FLINK-15201
>
> Thanks for reporting.
> Kyle
>
> On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from
>> Beam 2.16.0 to 2.19.0.
>> Everything went quite smoothly, the local runner and the local Flink
>> runner work flawlessly.
>>
>> However when I:
>>   1. Generate a Beam jar for the FlinkRunner via maven (mvn package
>> -PFlinkRunner)
>>   2. Glue that into a Flink 1.9 docker image
>>   3. Start the image as a Standalone Session Cluster
>>
>> When I try to launch the first pipeline I get the following exception
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Failed to construct instance from factory method
>> FlinkRunner#fromOptions(interface
>> org.apache.beam.sdk.options.PipelineOptions)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: java.lang.RuntimeException: Failed to construct instance from
>> factory method FlinkRunner#fromOptions(interface
>> org.apache.beam.sdk.options.PipelineOptions)
>> at
>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>> at
>> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
>> at
>> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>> at
>> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> ... 9 more
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
>> ... 19 more
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
>> environments cannot be created in detached mode
>> at
>> org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
>> at java.util.Optional.map(Optional.java:215)
>> at
>> org.apache.flink.api.java.Executio

Beam 2.19.0 / Flink 1.9.1 - Session cluster error when submitting job "Multiple environments cannot be created in detached mode"

2020-02-20 Thread Kaymak, Tobias
Hello,

I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from
Beam 2.16.0 to 2.19.0.
Everything went quite smoothly, the local runner and the local Flink runner
work flawlessly.

However when I:
  1. Generate a Beam jar for the FlinkRunner via maven (mvn package
-PFlinkRunner)
  2. Glue that into a Flink 1.9 docker image
  3. Start the image as a Standalone Session Cluster

When I try to launch the first pipeline I get the following exception

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to construct instance from factory method
FlinkRunner#fromOptions(interface
org.apache.beam.sdk.options.PipelineOptions)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Failed to construct instance from
factory method FlinkRunner#fromOptions(interface
org.apache.beam.sdk.options.PipelineOptions)
at
org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
at
org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
at
org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
... 19 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
environments cannot be created in detached mode
at
org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
at java.util.Optional.map(Optional.java:215)
at
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment(ExecutionEnvironment.java:1068)
at
org.apache.beam.runners.flink.translation.utils.Workarounds.restoreOriginalStdOutAndStdErrIfApplicable(Workarounds.java:43)
at
org.apache.beam.runners.flink.FlinkRunner.(FlinkRunner.java:96)
at
org.apache.beam.runners.flink.FlinkRunner.fromOptions(FlinkRunner.java:90)
... 24 more

I've checked the release notes and the issues and couldn't find anything
that relates to this. What puzzles me is that the session cluster should be
allowed to have multiple environments in detached mode - or am I wrong?

Best,
Tobi


Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-14 Thread Kaymak, Tobias
It had nothing to do with Flink or Beam, the dependency was introduced by a
company internal package that was referring to an older version of RocksDB.

I've spotted it via running

mvn dependency:tree

while investigating with a colleague. Excluding it fixed the issue for us.

On Tue, Aug 13, 2019 at 3:13 PM Kaymak, Tobias 
wrote:

> Ok I think I have an understanding of what happens - somehow.
> Flink switched their RocksDB fork in the 1.8 release, this is why the
> dependency must now be explicitly added to a project. [0]
> I did both actually, adding this dependency to my projects pom (resulting
> in beam_pipelines.jar) and to the lib directory of the Flink docker image
> to execute the pipeline [1]:
>
> FROM flink:1.8.0-scala_2.11
> ADD --chown=flink:flink
> http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar
> /opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar
> ADD --chown=flink:flink target/di-beam-bundled.jar
> /opt/flink/lib/beam_pipelines.jar
>
> Now everything works up the point when I hit the "Stop" button in the
> Flink web interface. I think the dependency that the Beam Flink Runner has
> is wrong as Flink switched to FRocksDB in 1.8 [2]. I guess that's why the
> runner then hits the:
> java.lang.NoSuchMethodError: org.rocksdb.ColumnFamilyHandle.
> getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>
> But I might also be wrong, I am still investigating.
>
> Best,
> Tobi
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend
> [1] https://hub.docker.com/_/flink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471
>
> On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias 
> wrote:
>
>> This is a major issue for us as we are no longer able to do a
>> clean-shutdown of the pipelines right now - only cancelling them hard is
>> possible.
>>
>> On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias 
>> wrote:
>>
>>> I just rolled out the upgraded and working 1.8.0/2.14.0 combination to
>>> production and noticed that when I try to cleanly shutdown a pipeline via
>>> the stop button in the web-interface of Flink 1.8.0 I get exactly the same
>>> error:
>>>
>>> java.lang.NoSuchMethodError:
>>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> The pipeline then restores from the last snapshot and continues to run,
>>> it does not shut-down as expected.
>>>
>>> Any idea why this could happen?
>>>
>>> On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias 
>>> wrote:
>>>
>>>> * each time :)
>>>>
>>>> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> I've checked multiple times now and it breaks as with the 1.8.1 image
>>>>> - I've completely rebuilt the Docker image and teared down the testing
>>>>> cluster.
>>>>>
>>>>> Best,
>>>>> Tobi
>>>>>
>>>>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels 
>>>>> wrote:
>>>>>
>>>>>> Hi Tobias!
>>>>>>
>>>>>> I've checked if there were any relevant changes to the RocksDB state
>>>>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>>>>>> version of RocksDB is still in the Flink cluster path?
>>>>>>
>>>>>> Cheers,
>>>>&g

Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-13 Thread Kaymak, Tobias
Ok I think I have an understanding of what happens - somehow.
Flink switched their RocksDB fork in the 1.8 release, this is why the
dependency must now be explicitly added to a project. [0]
I did both actually, adding this dependency to my projects pom (resulting
in beam_pipelines.jar) and to the lib directory of the Flink docker image
to execute the pipeline [1]:

FROM flink:1.8.0-scala_2.11
ADD --chown=flink:flink
http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar
/opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar
ADD --chown=flink:flink target/di-beam-bundled.jar
/opt/flink/lib/beam_pipelines.jar

Now everything works up the point when I hit the "Stop" button in the Flink
web interface. I think the dependency that the Beam Flink Runner has is
wrong as Flink switched to FRocksDB in 1.8 [2]. I guess that's why the
runner then hits the:
java.lang.NoSuchMethodError: org.rocksdb.ColumnFamilyHandle.
getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;

But I might also be wrong, I am still investigating.

Best,
Tobi

[0]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend
[1] https://hub.docker.com/_/flink
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471

On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias 
wrote:

> This is a major issue for us as we are no longer able to do a
> clean-shutdown of the pipelines right now - only cancelling them hard is
> possible.
>
> On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias 
> wrote:
>
>> I just rolled out the upgraded and working 1.8.0/2.14.0 combination to
>> production and noticed that when I try to cleanly shutdown a pipeline via
>> the stop button in the web-interface of Flink 1.8.0 I get exactly the same
>> error:
>>
>> java.lang.NoSuchMethodError:
>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> The pipeline then restores from the last snapshot and continues to run,
>> it does not shut-down as expected.
>>
>> Any idea why this could happen?
>>
>> On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias 
>> wrote:
>>
>>> * each time :)
>>>
>>> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias 
>>> wrote:
>>>
>>>> I've checked multiple times now and it breaks as with the 1.8.1 image -
>>>> I've completely rebuilt the Docker image and teared down the testing
>>>> cluster.
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels 
>>>> wrote:
>>>>
>>>>> Hi Tobias!
>>>>>
>>>>> I've checked if there were any relevant changes to the RocksDB state
>>>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>>>>> version of RocksDB is still in the Flink cluster path?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 06.08.19 16:43, Kaymak, Tobias wrote:
>>>>> > And of course the moment I click "send" I find that: 😂
>>>>> >
>>>>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam
>>>>> projects
>>>>> > pom.xml it *does* work:
>>>>> >
>>>>> > 
>>>>> > org.apache.flink
>>>>> > flink-statebackend-rocksdb_2.11
>>>>> > 1.8.0
>>>>> > 
>>>>> >
>>>>> > However, if you want to use 1.8.1 - it *does not*.
>>>>> >
>>>>

Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-13 Thread Kaymak, Tobias
This is a major issue for us as we are no longer able to do a
clean-shutdown of the pipelines right now - only cancelling them hard is
possible.

On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias 
wrote:

> I just rolled out the upgraded and working 1.8.0/2.14.0 combination to
> production and noticed that when I try to cleanly shutdown a pipeline via
> the stop button in the web-interface of Flink 1.8.0 I get exactly the same
> error:
>
> java.lang.NoSuchMethodError:
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
> at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
> The pipeline then restores from the last snapshot and continues to run, it
> does not shut-down as expected.
>
> Any idea why this could happen?
>
> On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias 
> wrote:
>
>> * each time :)
>>
>> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias 
>> wrote:
>>
>>> I've checked multiple times now and it breaks as with the 1.8.1 image -
>>> I've completely rebuilt the Docker image and teared down the testing
>>> cluster.
>>>
>>> Best,
>>> Tobi
>>>
>>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels 
>>> wrote:
>>>
>>>> Hi Tobias!
>>>>
>>>> I've checked if there were any relevant changes to the RocksDB state
>>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>>>> version of RocksDB is still in the Flink cluster path?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 06.08.19 16:43, Kaymak, Tobias wrote:
>>>> > And of course the moment I click "send" I find that: 😂
>>>> >
>>>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam
>>>> projects
>>>> > pom.xml it *does* work:
>>>> >
>>>> > 
>>>> > org.apache.flink
>>>> > flink-statebackend-rocksdb_2.11
>>>> > 1.8.0
>>>> > 
>>>> >
>>>> > However, if you want to use 1.8.1 - it *does not*.
>>>> >
>>>> > I still found it confusing, as I am using the official Flink Docker
>>>> > images which are currently at version 1.8.1. It would have helped me
>>>> if
>>>> > Beam would bundle the statebackend dependency (as already mentioned
>>>> Beam
>>>> > allows the user to set a state backend via parameters of the
>>>> Flink Runner).
>>>> >
>>>> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch
>>>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>>>> >
>>>> > Hello,
>>>> >
>>>> > Flink requires in version 1.8, that if one wants to use RocksDB
>>>> as a
>>>> > state backend, that dependency has to be added to the
>>>> pom.xml file. [0]
>>>> >
>>>> > My cluster stopped working with RocksDB so I did added this
>>>> > dependency to the pom.xml of my Beam project (I've tried 1.8.1 and
>>>> > 1.8.0):
>>>> >
>>>> >   
>>>> > org.apache.flink
>>>> >
>>>> flink-statebackend-rocksdb_2.11
>>>> > 1.8.0
>>>> > 
>>>> >
>>>> > I also tried to instead add
>>>> > the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib directory
>>>> > of the Flink cluster instead (TaskManagers and JobManager) in all
>>>> > cases I get this error:
>>>> >
>>>> 

Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-13 Thread Kaymak, Tobias
I just rolled out the upgraded and working 1.8.0/2.14.0 combination to
production and noticed that when I try to cleanly shutdown a pipeline via
the stop button in the web-interface of Flink 1.8.0 I get exactly the same
error:

java.lang.NoSuchMethodError:
org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


The pipeline then restores from the last snapshot and continues to run, it
does not shut-down as expected.

Any idea why this could happen?

On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias 
wrote:

> * each time :)
>
> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias 
> wrote:
>
>> I've checked multiple times now and it breaks as with the 1.8.1 image -
>> I've completely rebuilt the Docker image and teared down the testing
>> cluster.
>>
>> Best,
>> Tobi
>>
>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels 
>> wrote:
>>
>>> Hi Tobias!
>>>
>>> I've checked if there were any relevant changes to the RocksDB state
>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>>> version of RocksDB is still in the Flink cluster path?
>>>
>>> Cheers,
>>> Max
>>>
>>> On 06.08.19 16:43, Kaymak, Tobias wrote:
>>> > And of course the moment I click "send" I find that: 😂
>>> >
>>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam
>>> projects
>>> > pom.xml it *does* work:
>>> >
>>> > 
>>> > org.apache.flink
>>> > flink-statebackend-rocksdb_2.11
>>> > 1.8.0
>>> > 
>>> >
>>> > However, if you want to use 1.8.1 - it *does not*.
>>> >
>>> > I still found it confusing, as I am using the official Flink Docker
>>> > images which are currently at version 1.8.1. It would have helped me if
>>> > Beam would bundle the statebackend dependency (as already mentioned
>>> Beam
>>> > allows the user to set a state backend via parameters of the
>>> Flink Runner).
>>> >
>>> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch
>>> > <mailto:tobias.kay...@ricardo.ch>> wrote:
>>> >
>>> > Hello,
>>> >
>>> > Flink requires in version 1.8, that if one wants to use RocksDB as
>>> a
>>> > state backend, that dependency has to be added to the
>>> pom.xml file. [0]
>>> >
>>> > My cluster stopped working with RocksDB so I did added this
>>> > dependency to the pom.xml of my Beam project (I've tried 1.8.1 and
>>> > 1.8.0):
>>> >
>>> >   
>>> > org.apache.flink
>>> >
>>> flink-statebackend-rocksdb_2.11
>>> > 1.8.0
>>> > 
>>> >
>>> > I also tried to instead add
>>> > the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib directory
>>> > of the Flink cluster instead (TaskManagers and JobManager) in all
>>> > cases I get this error:
>>> >
>>> > 2019-08-06 14:14:15,670 ERROR
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
>>> > Error during disposal of stream operator
>>> > java.lang.NoSuchMethodError:
>>> >
>>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>>> >   at
>>> >
>>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>>> >   at
>>> >
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStat

Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-12 Thread Kaymak, Tobias
* each time :)

On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias 
wrote:

> I've checked multiple times now and it breaks as with the 1.8.1 image -
> I've completely rebuilt the Docker image and teared down the testing
> cluster.
>
> Best,
> Tobi
>
> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels  wrote:
>
>> Hi Tobias!
>>
>> I've checked if there were any relevant changes to the RocksDB state
>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>> version of RocksDB is still in the Flink cluster path?
>>
>> Cheers,
>> Max
>>
>> On 06.08.19 16:43, Kaymak, Tobias wrote:
>> > And of course the moment I click "send" I find that: 😂
>> >
>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam projects
>> > pom.xml it *does* work:
>> >
>> > 
>> > org.apache.flink
>> > flink-statebackend-rocksdb_2.11
>> > 1.8.0
>> > 
>> >
>> > However, if you want to use 1.8.1 - it *does not*.
>> >
>> > I still found it confusing, as I am using the official Flink Docker
>> > images which are currently at version 1.8.1. It would have helped me if
>> > Beam would bundle the statebackend dependency (as already mentioned Beam
>> > allows the user to set a state backend via parameters of the
>> Flink Runner).
>> >
>> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias > > <mailto:tobias.kay...@ricardo.ch>> wrote:
>> >
>> > Hello,
>> >
>> > Flink requires in version 1.8, that if one wants to use RocksDB as a
>> > state backend, that dependency has to be added to the pom.xml file.
>> [0]
>> >
>> > My cluster stopped working with RocksDB so I did added this
>> > dependency to the pom.xml of my Beam project (I've tried 1.8.1 and
>> > 1.8.0):
>> >
>> >   
>> > org.apache.flink
>> > flink-statebackend-rocksdb_2.11
>> > 1.8.0
>> > 
>> >
>> > I also tried to instead add
>> > the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib directory
>> > of the Flink cluster instead (TaskManagers and JobManager) in all
>> > cases I get this error:
>> >
>> > 2019-08-06 14:14:15,670 ERROR
>> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
>> > Error during disposal of stream operator
>> > java.lang.NoSuchMethodError:
>> >
>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>> >   at
>> >
>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>> >   at
>> >
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>> >   at
>> >
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>> >   at
>> >
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>> >   at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>> >   at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> >   at java.lang.Thread.run(Thread.java:748)
>> >
>> > This looks like a version mismatch to me, but I don't know how to
>> > solve it - could Beam maybe include the dependency for the RocksDB
>> > backend for Flink 1.8 or higher, as it allows to set this value via
>> > parameters for the Flink Runner? [1]
>> >
>> >
>> > [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
>> > [1]
>> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
>> >
>>
>>


Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-12 Thread Kaymak, Tobias
I've checked multiple times now and it breaks as with the 1.8.1 image -
I've completely rebuilt the Docker image and teared down the testing
cluster.

Best,
Tobi

On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels  wrote:

> Hi Tobias!
>
> I've checked if there were any relevant changes to the RocksDB state
> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
> version of RocksDB is still in the Flink cluster path?
>
> Cheers,
> Max
>
> On 06.08.19 16:43, Kaymak, Tobias wrote:
> > And of course the moment I click "send" I find that: 😂
> >
> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam projects
> > pom.xml it *does* work:
> >
> > 
> > org.apache.flink
> > flink-statebackend-rocksdb_2.11
> > 1.8.0
> > 
> >
> > However, if you want to use 1.8.1 - it *does not*.
> >
> > I still found it confusing, as I am using the official Flink Docker
> > images which are currently at version 1.8.1. It would have helped me if
> > Beam would bundle the statebackend dependency (as already mentioned Beam
> > allows the user to set a state backend via parameters of the
> Flink Runner).
> >
> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias  > <mailto:tobias.kay...@ricardo.ch>> wrote:
> >
> > Hello,
> >
> > Flink requires in version 1.8, that if one wants to use RocksDB as a
> > state backend, that dependency has to be added to the pom.xml file.
> [0]
> >
> > My cluster stopped working with RocksDB so I did added this
> > dependency to the pom.xml of my Beam project (I've tried 1.8.1 and
> > 1.8.0):
> >
> >   
> > org.apache.flink
> > flink-statebackend-rocksdb_2.11
> > 1.8.0
> > 
> >
> > I also tried to instead add
> > the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib directory
> > of the Flink cluster instead (TaskManagers and JobManager) in all
> > cases I get this error:
> >
> > 2019-08-06 14:14:15,670 ERROR
> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
> > Error during disposal of stream operator
> > java.lang.NoSuchMethodError:
> >
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
> >   at
> >
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
> >   at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
> >   at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
> >   at
> >
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
> >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
> >   at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> >   at java.lang.Thread.run(Thread.java:748)
> >
> > This looks like a version mismatch to me, but I don't know how to
> > solve it - could Beam maybe include the dependency for the RocksDB
> > backend for Flink 1.8 or higher, as it allows to set this value via
> > parameters for the Flink Runner? [1]
> >
> >
> > [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
> > [1]
> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
> >
>
>


Re: WordCount example breaks for FlinkRunner (local) for 2.14.0

2019-08-07 Thread Kaymak, Tobias
just figured out that it might not detect the right environment (local
execution) - passing

--flinkMaster=\[local\]

like java -cp target/word-count-beam-bundled-0.1.jar
org.apache.beam.examples.WordCount --runner=FlinkRunner --output=/tmp/out
--flinkMaster=\[local\]

when running does the trick too :)



On Wed, Aug 7, 2019 at 6:59 PM Lukasz Cwik  wrote:

> The uber jar your using looks like it is incorrectly merging dependencies
> which is causing the issue you reported.
>
> Please try using mvn to run the example directly:
> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
>  -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml
> --output=counts" -Pflink-runner
>
> Here are more details about running the quickstart on various runners:
> https://beam.apache.org/get-started/quickstart-java/
>
> Is there some quickstart guide or documentation which told you to use mvn
> package and then java -jar beam.jar that we should be updating?
>
> On Wed, Aug 7, 2019 at 9:27 AM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> after generating the examples in 2.14.0 via:
>>
>> mvn archetype:generate \
>>   -DarchetypeGroupId=org.apache.beam \
>>   -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
>>   -DarchetypeVersion=2.14.0 \
>>   -DgroupId=org.example \
>>   -DartifactId=word-count-beam \
>>   -Dversion="0.1" \
>>   -Dpackage=org.apache.beam.examples \
>>   -DinteractiveMode=false
>>
>> compiling it via:
>>
>> mvn clean package -Pflink-runner
>>
>> and running it via the FlinkRunner (local):
>>
>> java -cp target/word-count-beam-bundled-0.1.jar
>> org.apache.beam.examples.WordCount --runner=FlinkRunner --output=/tmp/out
>>
>> I get the following error:
>>
>> Exception in thread "main" java.lang.RuntimeException: Pipeline execution
>> failed
>> at
>> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>
>> at
>> org.apache.beam.examples.WordCount.runWordCount(WordCount.java:185)
>>
>> at org.apache.beam.examples.WordCount.main(WordCount.java:192)
>>
>> Caused by: com.typesafe.config.ConfigException$UnresolvedSubstitution:
>> reference.conf @
>> jar:file:/private/tmp/word-count-beam/target/word-count-beam-bundled-0.1.jar!/reference.conf:
>> 804: Could not resolve
>>  substitution to a value: ${akka.stream.materializer}
>>
>> at
>> com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
>>
>>
>> Should I file an issue in Jira or am I doing something wrong?
>>
>


WordCount example breaks for FlinkRunner (local) for 2.14.0

2019-08-07 Thread Kaymak, Tobias
Hello,

after generating the examples in 2.14.0 via:

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.beam \
  -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
  -DarchetypeVersion=2.14.0 \
  -DgroupId=org.example \
  -DartifactId=word-count-beam \
  -Dversion="0.1" \
  -Dpackage=org.apache.beam.examples \
  -DinteractiveMode=false

compiling it via:

mvn clean package -Pflink-runner

and running it via the FlinkRunner (local):

java -cp target/word-count-beam-bundled-0.1.jar
org.apache.beam.examples.WordCount --runner=FlinkRunner --output=/tmp/out

I get the following error:

Exception in thread "main" java.lang.RuntimeException: Pipeline execution
failed
at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)

at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)

at
org.apache.beam.examples.WordCount.runWordCount(WordCount.java:185)

at org.apache.beam.examples.WordCount.main(WordCount.java:192)

Caused by: com.typesafe.config.ConfigException$UnresolvedSubstitution:
reference.conf @
jar:file:/private/tmp/word-count-beam/target/word-count-beam-bundled-0.1.jar!/reference.conf:
804: Could not resolve
 substitution to a value: ${akka.stream.materializer}

at
com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)


Should I file an issue in Jira or am I doing something wrong?


Trying to rebuild the Timely/Stateful Processing example from the Beam Blog

2019-08-07 Thread Kaymak, Tobias
Hello,

during a hackathon we are trying to rebuild the [0] Batched RPC call
example written by Kenneth Knowles. There is a question on SO about it that
I am trying to answer (as I think the Windowing is not done correctly) [1].
While doing so I discovered that it is unclear to me how the
`staleSetState` variable in the last "Processing Time Timers" chapter is
defined. It is not passed into the method and also not defined above as it
seems.

Is the code of the example somewhere available in full?


[0] https://beam.apache.org/blog/2017/08/28/timely-processing.html
[1]
https://stackoverflow.com/questions/57221401/problem-with-state-and-timers-apache-beam


Re: Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-06 Thread Kaymak, Tobias
And of course the moment I click "send" I find that: 😂

If you use Scala 2.11 and dependency version 1.8.0 in your Beam projects
pom.xml it *does* work:


org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


However, if you want to use 1.8.1 - it *does not*.

I still found it confusing, as I am using the official Flink Docker images
which are currently at version 1.8.1. It would have helped me if Beam would
bundle the statebackend dependency (as already mentioned Beam allows the
user to set a state backend via parameters of the Flink Runner).

On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias 
wrote:

> Hello,
>
> Flink requires in version 1.8, that if one wants to use RocksDB as a state
> backend, that dependency has to be added to the pom.xml file. [0]
>
> My cluster stopped working with RocksDB so I did added this dependency to
> the pom.xml of my Beam project (I've tried 1.8.1 and 1.8.0):
>
>   
> org.apache.flink
> flink-statebackend-rocksdb_2.11
> 1.8.0
> 
>
> I also tried to instead add the flink-statebackend-rocksdb_2.11-1.8.0.jar
> to the lib directory of the Flink cluster instead (TaskManagers and
> JobManager) in all cases I get this error:
>
> 2019-08-06 14:14:15,670 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator
> java.lang.NoSuchMethodError:
> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>   at
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>   at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>   at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>   at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
>
> This looks like a version mismatch to me, but I don't know how to solve it
> - could Beam maybe include the dependency for the RocksDB backend for Flink
> 1.8 or higher, as it allows to set this value via parameters for the Flink
> Runner? [1]
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
> [1]
> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
>


Using Beam 2.14.0 + Flink 1.8 with RocksDB state backend - perhaps missing dependency?

2019-08-06 Thread Kaymak, Tobias
Hello,

Flink requires in version 1.8, that if one wants to use RocksDB as a state
backend, that dependency has to be added to the pom.xml file. [0]

My cluster stopped working with RocksDB so I did added this dependency to
the pom.xml of my Beam project (I've tried 1.8.1 and 1.8.0):

  
org.apache.flink
flink-statebackend-rocksdb_2.11
1.8.0


I also tried to instead add the flink-statebackend-rocksdb_2.11-1.8.0.jar
to the lib directory of the Flink cluster instead (TaskManagers and
JobManager) in all cases I get this error:

2019-08-06 14:14:15,670 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator
java.lang.NoSuchMethodError:
org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
  at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
  at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
  at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

This looks like a version mismatch to me, but I don't know how to solve it
- could Beam maybe include the dependency for the RocksDB backend for Flink
1.8 or higher, as it allows to set this value via parameters for the Flink
Runner? [1]


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
[1]
https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner


Re: Using Beam 2.14.0 and Flink 1.8.1 - getting {"errors":["Not found."]} in the web ui

2019-08-06 Thread Kaymak, Tobias
I had a fat JAR that was using Flink 1.7 in its POM. After bumping that to
1.8 it works :)

On Mon, Aug 5, 2019 at 4:49 PM Kaymak, Tobias 
wrote:

> Hello,
>
> I've upgraded my dependencies from Beam 2.12 to Beam 2.14 and from Flink
> 1.7.2 to 1.8.1. I've increased my log level down to INFO for all
> components, the flink job manager looks fine.
>
> I can launch jobs via the cmdline and I can see them when I run `flink
> list` - but the webinterface is returning {"errors":["Not found."]} when
> I try to access it on port 8081. I've checked the release notes for Flink
> and Beam, but I could not spot anything.
>
> Any suggestions?
>


Using Beam 2.14.0 and Flink 1.8.1 - getting {"errors":["Not found."]} in the web ui

2019-08-05 Thread Kaymak, Tobias
Hello,

I've upgraded my dependencies from Beam 2.12 to Beam 2.14 and from Flink
1.7.2 to 1.8.1. I've increased my log level down to INFO for all
components, the flink job manager looks fine.

I can launch jobs via the cmdline and I can see them when I run `flink
list` - but the webinterface is returning {"errors":["Not found."]} when I
try to access it on port 8081. I've checked the release notes for Flink and
Beam, but I could not spot anything.

Any suggestions?


Re: Apache BEAM on Flink in production

2019-05-28 Thread Kaymak, Tobias
We (Ricardo.ch) are using it in production and I will share our use-case
during the upcoming Apache Beam Summit in Berlin in a few weeks. I don't
know if there will be recordings, but I will share the slides afterwards. :)
https://beamsummit.org/schedule/2019-06-20?sessionId=114625

On Thu, May 9, 2019 at 6:15 PM David Sabater Dinter 
wrote:

> Hi Stephen,
> There is another relevant video
>  from GCP Next on where Lyft
> presented their use case, together with some really cool Dataflow
> announcements from Sergei , Dataflow
> PM @Google.
>
> Regards.
>
> *From: *Maximilian Michels 
> *Date: *Wed, 8 May 2019 at 10:16
> *To: * 
> *Cc: * 
>
> Hi Stephen,
>>
>> Apart from Lyft there are many Beam users on Flink. I'm not sure I can
>> publicly say their names, but I have been working with different
>> companies running multiple production pipelines with Beam on Flink.
>>
>> Since you mentioned ETL, most of them are actually using streaming
>> pipelines.
>>
>> Thanks,
>> Max
>>
>> On 07.05.19 18:17, Austin Bennett wrote:
>> > On the Beam YouTube channel:
>> > https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ you can see
>> two
>> > talks from people at Lyft; they use Beam on Flink.
>> >
>> > Other users can also chime in as to how they are running.
>> >
>> > Would also suggest coming to BeamSummit.org in Berlin in June and/or
>> > sharing experiences or coming to ApacheCon in September, where we are
>> to
>> > have 2 tracks in each of 2 days focused on Beam
>> > https://www.apachecon.com/acna19/index.html
>> >
>> >
>> >
>> >
>> > On Tue, May 7, 2019 at 6:52 AM > > > wrote:
>> >
>> > Hi all,
>> >
>> > __ __
>> >
>> > We currently run Apache Flink based data load processes (fairly
>> > simple streaming ETL jobs) and are looking at converting to Apache
>> > BEAM to give more flexibility on the runner.
>> >
>> > __ __
>> >
>> > Is anyone aware of any organisations running Apache BEAM on Flink in
>> > production. Does anyone have any case studies they would be able to
>> > share?
>> >
>> > __ __
>> >
>> > Many thanks,
>> >
>> > __ __
>> >
>> > Steve
>> >
>> >
>> > This communication and any attachments are confidential and intended
>> > solely for the addressee. If you are not the intended recipient
>> > please advise us immediately and delete it. Unless specifically
>> > stated in the message or otherwise indicated, you may not duplicate,
>> > redistribute or forward this message and any attachments are not
>> > intended for distribution to, or use by any person or entity in any
>> > jurisdiction or country where such distribution or use would be
>> > contrary to local law or regulation. NatWest Markets Plc  or any
>> > affiliated entity ("NatWest Markets") accepts no responsibility for
>> > any changes made to this message after it was sent.
>> > Unless otherwise specifically indicated, the contents of this
>> > communication and its attachments are for information purposes only
>> > and should not be regarded as an offer or solicitation to buy or
>> > sell a product or service, confirmation of any transaction, a
>> > valuation, indicative price or an official statement. Trading desks
>> > may have a position or interest that is inconsistent with any views
>> > expressed in this message. In evaluating the information contained
>> > in this message, you should know that it could have been previously
>> > provided to other clients and/or internal NatWest Markets personnel,
>> > who could have already acted on it.
>> > NatWest Markets cannot provide absolute assurances that all
>> > electronic communications (sent or received) are secure, error free,
>> > not corrupted, incomplete or virus free and/or that they will not be
>> > lost, mis-delivered, destroyed, delayed or intercepted/decrypted by
>> > others. Therefore NatWest Markets disclaims all liability with
>> > regards to electronic communications (and the contents therein) if
>> > they are corrupted, lost destroyed, delayed, incomplete,
>> > mis-delivered, intercepted, decrypted or otherwise misappropriated
>> > by others.
>> > Any electronic communication that is conducted within or through
>> > NatWest Markets systems will be subject to being archived, monitored
>> > and produced to regulators and in litigation in accordance with
>> > NatWest Markets’ policy and local laws, rules and regulations.
>> > Unless expressly prohibited by local law, electronic communications
>> > may be archived in countries other than the country in which you are
>> > located, and may be treated in accordance with the laws and
>> > regulations of the country of each individual included in the entire
>> > chain.
>> > Copyright NatWes

Re: Running Beam 2.11 in Flink 1.6.2: All metrics 0 for Solr pipeline

2019-04-06 Thread Kaymak, Tobias
Thanks for the explanation!

On Fri, Apr 5, 2019 at 1:26 PM Maximilian Michels  wrote:

> Hi Tobias,
>
> Though not intuitive, this is expected behavior.
>
> First of all, you see just a single operator in the web UI because all
> tasks are "chained", i.e. combined to execute directly one after another
> inside a single operator. This is an optimization technique of Flink.
> Chaining is done whenever the data is not redistributed (shuffled)
> between two successive operators.
>
> Now, for the metrics of this operator:
>
> "Bytes/Records received" counts how many bytes were received by the
> Flink operator from an upstream task. There is no upstream task for this
> chained operator because it reads from Kafka.
>
> "Bytes/Records sent" counts how many bytes were sent downstream from an
> upstream task. There is no downstream task in case of this chained
> operator because it writes directly to Solr.
>
> I'm thinking, we might expose a Flink pipeline option to control
> chaining in Beam. However, users usually want to apply chaining because
> it is a great optimization technique.
>
> Thanks,
> Max
>
> On 05.04.19 12:02, Kaymak, Tobias wrote:
> > Hello,
> >
> > I am currently at a hackathon playing with a very simple Beam 2.11
> > pipeline in Java that reads from Kafka and writes to Solr with no
> > windowing applied (Just mapping the values).
> >
> > The pipeline works, but the metrics in the Flink web interface always
> > show 0 - which is kind of strange. Any idea what could cause this?
> >
> > What I find interesting: There is just one node created for the whole
> > operation when the Flink pipeline gets translated from Beam.
> >
> >  pipeline.apply("Read from Kafka",
> >  KafkaIO. > String>read().withBootstrapServers(bootstrap).withTopics(topics)
> >  .withKeyDeserializer(StringDeserializer.class)
> >  .withValueDeserializer(StringDeserializer.class)
> >  .updateConsumerProperties(consumerProperties)
> >  .withReadCommitted()
> >  .withTimestampPolicyFactory(withEventTs)
> >  .commitOffsetsInFinalize())
> >.apply("Convert To Solr Document", ParDo.of(new
> > ToSolrDocument()))
> >
> > .apply(SolrIO.write().to("orders_2").withConnectionConfiguration(conn));
> >  pipeline.run().waitUntilFinish();
> >
> > image.png
> > image.png
> >
> > Best,
> > Tobi
>


Re: Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-29 Thread Kaymak, Tobias
Can confirm that this is the issue, starting with streaming=True fixes it.

On Fri, Mar 29, 2019 at 11:53 AM Maximilian Michels  wrote:

> Hi Tobias,
>
> Thank for reporting. Can confirm, this is a regression with the
> detection of the execution mode. Everything should work fine if you set
> the "streaming" flag to true. Will be fixed for the 2.12.0 release.
>
> Thanks,
> Max
>
> On 28.03.19 17:28, Lukasz Cwik wrote:
> > +dev <mailto:d...@beam.apache.org>
> >
> > On Thu, Mar 28, 2019 at 9:13 AM Kaymak, Tobias  > <mailto:tobias.kay...@ricardo.ch>> wrote:
> >
> > Hello,
> >
> > I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and
> > from Beam 2.10 to 2.11 and I am seeing this error when starting my
> > pipelines:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error.
> >  at
> >
>  
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> >  at
> >
>  
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >  at
> >
>  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> >  at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >  at
> >
>  
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> >  at
> >
>  org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> >  at
> >
>  
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> >  at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> > Caused by: java.lang.UnsupportedOperationException: The transform
> > beam:transform:create_view:v1 is currently not supported.
> >  at
> >
>  
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
> >
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> >  at
> >
>  
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> >  at
> > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> >  at
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
> >  at
> >
>  
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)
> >
> >  at
> >
>  
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)
> >
> >  at
> > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
> >  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
> >  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> >  at
> > ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
> >  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> >  at
> >
>  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >  at
> >
>  
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > 

Re: Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-28 Thread Kaymak, Tobias
The pipeline code did not change and looks like the following:

pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
Window.into(new ZurichTimePartitioningWindowFn())

.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));


On Thu, Mar 28, 2019 at 5:13 PM Kaymak, Tobias 
wrote:

> Hello,
>
> I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and from
> Beam 2.10 to 2.11 and I am seeing this error when starting my pipelines:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.UnsupportedOperationException: The transform
> beam:transform:create_view:v1 is currently not supported.
>
>
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>
>
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)
>
>
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:1

Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-28 Thread Kaymak, Tobias
Hello,

I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and from Beam
2.10 to 2.11 and I am seeing this error when starting my pipelines:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.UnsupportedOperationException: The transform
beam:transform:create_view:v1 is currently not supported.


at
org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)


at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at
org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
at
org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)


at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)


at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more

I found this open issue while googling
https://jira.apache.org/jira/browse/BEAM-4301 - but it seems unrelated,
what makes me wonder is the type of error message I am seeing.
I tried Flink 1.7.2 with Scala 2.11 + 2.12 without luck.
I tried deleting all state information of Flink (ha/ and snapshots/), in
the end I tried downgrading to Beam 2.10. - And that worked.
Could it be that there is a bug that has been introduced in 2.11?

Best,
Tobi


Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

2019-02-22 Thread Kaymak, Tobias
Actually the number of retries is 1000 by default, so it might have not
reached that number yet. I will have to test that again.
https://github.com/apache/beam/blob/38daf8c45b14a94665939b562ab947dc72ad8f8f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1901

On Thu, Feb 14, 2019 at 1:05 PM Kaymak, Tobias 
wrote:

> Thank you Jeff,
>
> I think 1 is a bug and I am planning to report it in the bugtracker
> regarding 2 I have now a fixed TableSchema supplied to this pipeline with
> the expected fields, and I am ignoring unknown fields.
>
> Best,
> Tobi
>
> On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas  wrote:
>
>> To question 1, I also would have expected the pipeline to fail in the
>> case of files failing to load; I'm not sure why it's not. I thought the
>> BigQuery API returns a 400 level response code in the case of files failing
>> and that would bubble up to a pipeline execution error, but I haven't dug
>> through the code to verify that.
>>
>> As to question 2, the lack of a native map type in BigQuery means you're
>> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
>> the map type as an array of (key, value) structs [0]. You could modify your
>> payload to match that same sort of structure, which would transparently
>> handle future additions of new keys, but may not be as convenient for
>> querying.
>>
>> Otherwise, if you want to model the "log" structure as a struct in
>> BigQuery, I think you'd need to provide the schemas as a side input to
>> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
>> keys in the payload and output a view of the schema.
>>
>> Another option would be to set ignoreUnknownValues() which would drop
>> values for any fields not existing in the BQ table. Dropping those values
>> may or may not be acceptable for your use case.
>>
>> [0]
>> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>>
>> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias 
>> wrote:
>>
>>> Hello,
>>>
>>> I have a BigQuery table which ingests a Protobuf stream from Kafka with
>>> a Beam pipeline. The Protobuf has a `log Map` column which
>>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>>
>>> So I scanned my whole stream to know which schema fields to expect and
>>> created an empty daily-partitioned table in BigQuery with correct fields
>>> for this RECORD type.
>>>
>>> Now someone is pushing a new field into this RECORD and the import fails
>>> as the schema is not matching anymore. My pipeline is configured to use
>>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>>> does not throw any error.
>>>
>>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>>> are:
>>>
>>> 1. How can I make the pipeline fail hard in this situation?
>>> 2. How can I prevent this from happening? If I create the table with the
>>> correct schema in the beginning the table schema seems to be overwritten
>>> and autodetected again (without the added field). This is causing the load
>>> to fail. If I alter the schema and add the field manually when the pipeline
>>> is running it fixes it, but then I already have a dozen of failed imports.
>>> The main issue seems to be that I have a Protobuf schema which only
>>> defines a Map type, that is not specific enough to create
>>> the full matching schema from it.
>>>
>>>
>>>


Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

2019-02-14 Thread Kaymak, Tobias
Thank you Jeff,

I think 1 is a bug and I am planning to report it in the bugtracker
regarding 2 I have now a fixed TableSchema supplied to this pipeline with
the expected fields, and I am ignoring unknown fields.

Best,
Tobi

On Wed, Feb 13, 2019 at 5:03 PM Jeff Klukas  wrote:

> To question 1, I also would have expected the pipeline to fail in the case
> of files failing to load; I'm not sure why it's not. I thought the BigQuery
> API returns a 400 level response code in the case of files failing and that
> would bubble up to a pipeline execution error, but I haven't dug through
> the code to verify that.
>
> As to question 2, the lack of a native map type in BigQuery means you're
> in a bit of a tough spot. BigQuery loads from Avro handle this by modeling
> the map type as an array of (key, value) structs [0]. You could modify your
> payload to match that same sort of structure, which would transparently
> handle future additions of new keys, but may not be as convenient for
> querying.
>
> Otherwise, if you want to model the "log" structure as a struct in
> BigQuery, I think you'd need to provide the schemas as a side input to
> BigQueryIO. Perhaps you could have a branch of your pipeline look for new
> keys in the payload and output a view of the schema.
>
> Another option would be to set ignoreUnknownValues() which would drop
> values for any fields not existing in the BQ table. Dropping those values
> may or may not be acceptable for your use case.
>
> [0]
> https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#complex_types
>
> On Wed, Feb 13, 2019 at 9:27 AM Kaymak, Tobias 
> wrote:
>
>> Hello,
>>
>> I have a BigQuery table which ingests a Protobuf stream from Kafka with a
>> Beam pipeline. The Protobuf has a `log Map` column which
>> translates to a field "log" of type RECORD with unknown fields in BigQuery.
>>
>> So I scanned my whole stream to know which schema fields to expect and
>> created an empty daily-partitioned table in BigQuery with correct fields
>> for this RECORD type.
>>
>> Now someone is pushing a new field into this RECORD and the import fails
>> as the schema is not matching anymore. My pipeline is configured to use
>> FILE_LOADS and so these do fail - but the Flink pipeline just continues and
>> does not throw any error.
>>
>> My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions
>> are:
>>
>> 1. How can I make the pipeline fail hard in this situation?
>> 2. How can I prevent this from happening? If I create the table with the
>> correct schema in the beginning the table schema seems to be overwritten
>> and autodetected again (without the added field). This is causing the load
>> to fail. If I alter the schema and add the field manually when the pipeline
>> is running it fixes it, but then I already have a dozen of failed imports.
>> The main issue seems to be that I have a Protobuf schema which only
>> defines a Map type, that is not specific enough to create
>> the full matching schema from it.
>>
>>
>>


Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

2019-02-13 Thread Kaymak, Tobias
Hello,

I have a BigQuery table which ingests a Protobuf stream from Kafka with a
Beam pipeline. The Protobuf has a `log Map` column which
translates to a field "log" of type RECORD with unknown fields in BigQuery.

So I scanned my whole stream to know which schema fields to expect and
created an empty daily-partitioned table in BigQuery with correct fields
for this RECORD type.

Now someone is pushing a new field into this RECORD and the import fails as
the schema is not matching anymore. My pipeline is configured to use
FILE_LOADS and so these do fail - but the Flink pipeline just continues and
does not throw any error.

My Beam version is 2.10-SNAPSHOT running on Flink 1.6, my two questions are:

1. How can I make the pipeline fail hard in this situation?
2. How can I prevent this from happening? If I create the table with the
correct schema in the beginning the table schema seems to be overwritten
and autodetected again (without the added field). This is causing the load
to fail. If I alter the schema and add the field manually when the pipeline
is running it fixes it, but then I already have a dozen of failed imports.
The main issue seems to be that I have a Protobuf schema which only defines
a Map type, that is not specific enough to create the full
matching schema from it.


Re: Dealing with "large" checkpoint state of a Beam pipeline in Flink

2019-02-12 Thread Kaymak, Tobias
Thank you! I am using similar values but my problem was that my FILE_LOADS
were sometimes failing and this lead to this behavior. The pipeline didnt
fail though (which I was assuming it would do) it simply retried the
loading forever. (Retries were set to the default (-1)).

On Tue, Feb 12, 2019 at 6:11 PM Juan Carlos Garcia 
wrote:

> I forgot to mention that we uses hdfs as storage for checkpoint /
> savepoint.
>
> Juan Carlos Garcia  schrieb am Di., 12. Feb. 2019,
> 18:03:
>
>> Hi Tobias,
>>
>> I think this can happen when there is a lot of backpressure on the
>> pipeline.
>>
>> Don't know if it's normal but i have a pipeline reading from KafkaIO and
>> pushing to bigquery instreaming mode and i have seen checkpoint of almost
>> 1gb and whenever i am doing a savepoint for updating the pipeline it can
>> goes up to 8 GB of data on a savepoint.
>>
>> I am on Flink 1.5.x, on premises also using Rockdb and incremental.
>>
>> So far my only solutionto avoid errors while checkpointing or
>> savepointing is to make sure the checkpoint Timeout is high enough like 20m
>> or 30min.
>>
>>
>> Kaymak, Tobias  schrieb am Di., 12. Feb. 2019,
>> 17:33:
>>
>>> Hi,
>>>
>>> my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
>>> configured with FILE_LOADS as output. What bothers me is that even if I
>>> configure in my Flink 1.6 configuration
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>>
>>> I see states that are as big as 230 MiB and checkpoint timeouts, or
>>> checkpoints that take longer than 10 minutes to complete (I just saw one
>>> that took longer than 30 minutes).
>>>
>>> Am I missing something? Is there some room for improvement? Should I use
>>> a different storage backend for the checkpoints? (Currently they are stored
>>> on GCS).
>>>
>>> Best,
>>> Tobi
>>>
>>


Dealing with "large" checkpoint state of a Beam pipeline in Flink

2019-02-12 Thread Kaymak, Tobias
Hi,

my Beam 2.10-SNAPSHOT pipeline has a KafkaIO as input and a BigQueryIO
configured with FILE_LOADS as output. What bothers me is that even if I
configure in my Flink 1.6 configuration

state.backend: rocksdb
state.backend.incremental: true

I see states that are as big as 230 MiB and checkpoint timeouts, or
checkpoints that take longer than 10 minutes to complete (I just saw one
that took longer than 30 minutes).

Am I missing something? Is there some room for improvement? Should I use a
different storage backend for the checkpoints? (Currently they are stored
on GCS).

Best,
Tobi


Re: No checkpoints for KafkaIO to BigQueryIO pipeline on Flink runner?

2019-01-30 Thread Kaymak, Tobias
Hi Maximilian,

I can confirm that checkpoints work with Beam 2.10-SNAPSHOT and do not work
with version 2.9. I am very sure it is related to this issue:
https://issues.apache.org/jira/browse/FLINK-2491 - which has been fixed in
2.10, since parts of the pipeline are FINISHED after a couple of minutes
and this then triggers the shutdown of the checkpoints. However, executing
the pipeline on a Flink 1.5.5 cluster yields no metrics about the elements
processed in the webinterface anymore:

2019-01-30 09:14:53,934 WARN
org.apache.beam.sdk.metrics.MetricsEnvironment - Reporting metrics are not
supported in the current execution environment.

Is this a known issue? I want to change my Flink version to 1.6 to see if
this is fixed, but I am unsure at this point how to achieve this. Is it
something I can pass in my pom.xml?




[image: image.png]

Best,
Tobi



On Tue, Jan 29, 2019 at 4:27 PM Maximilian Michels  wrote:

> Hi Tobias,
>
> It is normal to see "No restore state for UnbounedSourceWrapper" when not
> restoring from a checkpoint/savepoint.
>
> Just checking. You mentioned you set the checkpoint interval via:
> --checkpointingInterval=30
>
> That means you have to wait 5 minutes until the first checkpoint will be
> taken.
> You should be seeing an INFO message like this: "INFO: Triggering
> checkpoint 1 @
> 1548775459114 for job 3b5bdb811f1923bf49db24403e9c1ae9."
>
> Thanks,
> Max
>
> On 29.01.19 16:13, Kaymak, Tobias wrote:
> > Even after altering the pipeline and making it way more simple it still
> does not
> > checkpoint. (I used a single KafkaTopic as a source and altered the IO
> step the
> > following way:
> >
> >   .apply(
> >  BigQueryIO.write()
> >  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >  .withTriggeringFrequency(refreshFrequency)
> >  .withNumFileShards(1)
> >  .to(projectId + ":" + dataset + "." + tableName)
> >  .withTimePartitioning(new
> > TimePartitioning().setField("event_date"))
> >  .withSchema(tableSchema)
> >  .withFormatFunction(
> >  (SerializableFunction)
> >  KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> > The graph that Flink 1.5.5 generated looked exactly the same and
> checkpointing
> > did not work still:
> > image.png
> >
> > On Tue, Jan 29, 2019 at 11:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch
> > <mailto:tobias.kay...@ricardo.ch>> wrote:
> >
> > If I have a pipeline running and I restart the taskmanager on which
> it's
> > executing the log shows - I find the "No restore state for
> > UnbounedSourceWrapper." interesting, as it seems to indicate that the
> > pipeline never stored a state in the first place?
> >
> > Starting taskexecutor as a console application on host
> > flink-taskmanager-5d85dd6854-pm5bl.
> > 2019-01-29 09:20:48,706 WARN
> org.apache.hadoop.util.NativeCodeLoader
> > - Unable to load native-hadoop library for your
> platform...
> > using builtin-java classes where applicable
> > 2019-01-29 09:20:51,253 WARN
> > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> SASL
> > configuration failed: javax.security.auth.login.LoginException: No
> JAAS
> > configuration section named 'Client' was found in specified JAAS
> > configuration file: '/tmp/jaas-7768141350028767113.conf'. Will
> continue
> > connection to Zookeeper server without SASL authentication, if
> Zookeeper
> > server allows it.
> > 2019-01-29 09:20:51,281 ERROR
> > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
> > Authentication failed
> > 2019-01-29 09:21:53,814 WARN  org.apache.flink.metrics.MetricGroup
>
> >  - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> > exceeded the 80 characters length limit and was truncated.
> > 2019-01-29 09:21:53,828 WARN  org.apache.flink.metrics.MetricGroup
>
> >  - The operator name
> >
>  
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Window.Into()/Window.Assign.out
> > exceeded the 80 ch

Re: Tuning BigQueryIO.Write

2019-01-30 Thread Kaymak, Tobias
Hi,

I am currently playing around with BigQueryIO options, and I am not an
expert on it, but 60 workers sounds like a lot to me (or expensive
computation) for 10k records hitting 2 tables each.
Could you maybe share the code of your pipeline?

Cheers,
Tobi

On Tue, Jan 22, 2019 at 9:28 PM Jeff Klukas  wrote:

> I'm attempting to deploy a fairly simple job on the Dataflow runner that
> reads from PubSub and writes to BigQuery using file loads, but I have so
> far not been able to tune it to keep up with the incoming data rate.
>
> I have configured BigQueryIO.write to trigger loads every 5 minutes, and
> I've let the job autoscale up to a max of 60 workers (which it has done).
> I'm using dynamic destinations to hit 2 field-partitioned tables. Incoming
> data per table is ~10k events/second, so every 5 minutes each table should
> be ingesting on order 200k records of ~20 kB apiece.
>
> We don't get many knobs to turn in BigQueryIO. I have tested numShards
> between 10 and 1000, but haven't seen obvious differences in performance.
>
> Potentially relevant: I see a high rate of warnings on the shuffler. They
> consist mostly of LevelDB warnings about "Too many L0 files". There are
> occasionally some other warnings relating to memory as well. Would using
> larger workers potentially help?
>
> Does anybody have experience with tuning BigQueryIO writing? It's quite a
> complicated transform under the hood and it looks like there are several
> steps of grouping and shuffling data that could be limiting throughput.
>


-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


Re: Is a window function necessary when using a KafkaIO source and a BigQueryIO sink?

2019-01-29 Thread Kaymak, Tobias
I am feeling a bit stupid, but I haven't had time to try out the different
possibilities to model the Kafka -> Partitioned-Table-in-BQ pipeline in
Beam, until now.
I am using the snapshort 2.10 version at the moment and your comment was on
point: After rewriting the pipeline (which limits it to deal only with a
single topic for input and output instead of many, but that is ok), this
works:

  pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(projectId + ":" + dataset + "." + tableName)
.withTimePartitioning(new
TimePartitioning().setField("event_date"))
.withSchema(tableSchema)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
  pipeline.run().waitUntilFinish();

Thank you!

On Wed, Jan 30, 2019 at 2:42 AM Reza Rokni  wrote:

> Thanx Tobi very interesting ! Sorry for the long gaps in email, I am based
> out of Singapore :-)
>
> I wonder if this is coming from the use of  .to(
> partitionedTableDynamicDestinations), which I am guessing accesses the
>  Intervalwindow. With the use of the Time column based partitioning in my
> pipeline I just use the URI to the table.
>
> And thanx for raising the question in general; Looking around I could not
> find a nice write up of the possible BigQueryIO patterns with the various
> partitioned and sharded table options that are now available. Will put
> something together over the next week or so and post back here, might see
> if its something that would suite a short blog as well.
>
> Cheers
>
> Reza
>
>
>
> On Tue, 29 Jan 2019 at 19:56, Kaymak, Tobias 
> wrote:
>
>> I am using FILE_LOADS and no timestamp column, but partitioned tables. In
>> this case I have seen the following error, when I comment the windowing
>> out (direct runner):
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException:
>> org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
>> org.apache.beam.sdk.transforms.windowing.IntervalWindow
>>
>> On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni  wrote:
>>
>>> Also my BQ table was partitioned based on a TIMESTAMP column, rather
>>> than being ingestion time based partitioning.
>>>
>>> Cheers
>>> Reza
>>>
>>> On Tue, 29 Jan 2019 at 15:44, Reza Rokni  wrote:
>>>
>>>> Hya Tobi,
>>>>
>>>> When you mention failed do you mean you get an error on running the
>>>> pipeline or there is a incorrect data issue?
>>>>
>>>> I was just trying some things with a PubSub source and a partitioned
>>>> table sink and was able to push things through, it was a very simple
>>>> pipeline through, with BigQueryIO.to() set to simple string.
>>>>
>>>> Cheers
>>>>
>>>> Reza
>>>>
>>>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias 
>>>> wrote:
>>>>
>>>>> But it seems like that it fails, when I remove the windowing from the
>>>>> pipeline, so I guess the answer is a no.
>>>>>
>>>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>>>> if the windowing step could be skipped. :)
>>>>>>
>>&

Re: Is a window function necessary when using a KafkaIO source and a BigQueryIO sink?

2019-01-29 Thread Kaymak, Tobias
I am using FILE_LOADS and no timestamp column, but partitioned tables. In
this case I have seen the following error, when I comment the windowing out
(direct runner):

Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException:
org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to
org.apache.beam.sdk.transforms.windowing.IntervalWindow

On Tue, Jan 29, 2019 at 9:11 AM Reza Rokni  wrote:

> Also my BQ table was partitioned based on a TIMESTAMP column, rather than
> being ingestion time based partitioning.
>
> Cheers
> Reza
>
> On Tue, 29 Jan 2019 at 15:44, Reza Rokni  wrote:
>
>> Hya Tobi,
>>
>> When you mention failed do you mean you get an error on running the
>> pipeline or there is a incorrect data issue?
>>
>> I was just trying some things with a PubSub source and a partitioned
>> table sink and was able to push things through, it was a very simple
>> pipeline through, with BigQueryIO.to() set to simple string.
>>
>> Cheers
>>
>> Reza
>>
>> On Mon, 28 Jan 2019 at 22:39, Kaymak, Tobias 
>> wrote:
>>
>>> But it seems like that it fails, when I remove the windowing from the
>>> pipeline, so I guess the answer is a no.
>>>
>>> On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>
>>>> Yes I am making use of partitioned tables, that's why I was wondering
>>>> if the windowing step could be skipped. :)
>>>>
>>>> Cheers
>>>> Tobi
>>>>
>>>> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni  wrote:
>>>>
>>>>> My apologies Tobi too quick to hit the send button :-(
>>>>>
>>>>> I was checking to ask if you had also looked at partition tables in
>>>>> BigQuery, assuming the only partitioning you are doing is by Day.
>>>>>
>>>>> Cheers
>>>>> Reza
>>>>>
>>>>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni  wrote:
>>>>>
>>>>>> Hi Tobi,
>>>>>>
>>>>>> Are you making use of partitioned tables in BigQuery or shard tables?
>>>>>>
>>>>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> Reza
>>>>>>
>>>>>>
>>>>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>
>>>>>>> I was spending some time with the "Streaming Systems" [0] book over
>>>>>>> the weekend and I thought that my pipeline might be doing something "too
>>>>>>> much" as the BigQuery sink already should partition the data by day and 
>>>>>>> put
>>>>>>> it in the right place - so can my windowing function in the following
>>>>>>> pipeline be left out?
>>>>>>>
>>>>>>> I am asking this since sometimes I miss an element at the very edge
>>>>>>> of a window compared to a pipeline with a GCS sink and I thought maybe 
>>>>>>> that
>>>>>>> this is related to doing the same thing twice (windowing and then the 
>>>>>>> sink
>>>>>>> does "window" it again):
>>>>>>>
>>>>>>> pipeline
>>>>>>> .apply(
>>>>>>> KafkaIO.read()
>>>>>>> .withBootstrapServers(bootstrap)
>>>>>>> .withTopics(topics)
>>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>>>
>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>> .updateConsumerProperties(
>>>>>>>
>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>> inputMessagesConfig))
>>>>>>>
>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>> "earliest"))
>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>> groupId))
>>>>>>>
>>>>>>

Re: No checkpoints for KafkaIO to BigQueryIO pipeline on Flink runner?

2019-01-29 Thread Kaymak, Tobias
rting at offset 13478
2019-01-29 09:21:58,145 INFO
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-0:
reading from ratings-9 starting at offset 12966


On Mon, Jan 28, 2019 at 3:36 PM Kaymak, Tobias 
wrote:

> Hi Maximilian,
>
> yes, I've set the --runner to FlinkRunner when launching the pipeline and
> it does work for a GCS sink, but it seems to be ignored for a BigQuery sink
> somehow. Even though it looks like the system magically handles it itself.
>
> This is the full command line to launch the Beam 2.9.0 pipeline on Flink
> 1.5.5:
>
> bin/flink run -d -c di.beam.KafkaToBigQuery -j lib/beam_pipelines.jar
> --runner=FlinkRunner --appName=ratings --checkpointingMode=EXACTLY_ONCE
> --checkpointingInterval=30 --parallelism=1
> --tempLocation=gs://somebucket
>
> Here are the logs from the taskmanager, I can share the full code of the
> pipeline if you want:
>
> 2019-01-28 14:33:31,287 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/DropInputs/ParMultiDo(NoOp)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:31,911 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:31,976 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/ParDo(GarbageCollectTemporaryFiles)/ParMultiDo(GarbageCollectTemporaryFiles)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,217 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/SinglePartitionWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,227 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/View.AsSingleton/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,228 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,276 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/MultiPartitionsWriteTables/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,282 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:32,288 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
> - Unbounded Flink Source 0/1 is reading from sources:
> [org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@5fecdf95
> ]
> 2019-01-28 14:33:32,296 INFO
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
> - No restore state for UnbounedSourceWrapper.
> 2019-01-28 14:33:32,318 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,321 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,324 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/View.CreatePCollectionView/Combine.globally(Concatenate)/Values/Values/Map/ParMultiDo(Anonymous)
> exceeded the 80 characters length limit and was truncated.
> 2019-01-28 14:33:32,329 WARN  org.apache.flink.metrics.MetricGroup
>   - The operator name
> BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.Globa

Re: Is a window function necessary when using a KafkaIO source and a BigQueryIO sink?

2019-01-28 Thread Kaymak, Tobias
But it seems like that it fails, when I remove the windowing from the
pipeline, so I guess the answer is a no.

On Mon, Jan 28, 2019 at 11:36 AM Kaymak, Tobias 
wrote:

> Yes I am making use of partitioned tables, that's why I was wondering if
> the windowing step could be skipped. :)
>
> Cheers
> Tobi
>
> On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni  wrote:
>
>> My apologies Tobi too quick to hit the send button :-(
>>
>> I was checking to ask if you had also looked at partition tables in
>> BigQuery, assuming the only partitioning you are doing is by Day.
>>
>> Cheers
>> Reza
>>
>> On Mon, 28 Jan 2019 at 17:22, Reza Rokni  wrote:
>>
>>> Hi Tobi,
>>>
>>> Are you making use of partitioned tables in BigQuery or shard tables?
>>>
>>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>>
>>> Cheers
>>>
>>> Reza
>>>
>>>
>>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias 
>>> wrote:
>>>
>>>> I was spending some time with the "Streaming Systems" [0] book over
>>>> the weekend and I thought that my pipeline might be doing something "too
>>>> much" as the BigQuery sink already should partition the data by day and put
>>>> it in the right place - so can my windowing function in the following
>>>> pipeline be left out?
>>>>
>>>> I am asking this since sometimes I miss an element at the very edge of
>>>> a window compared to a pipeline with a GCS sink and I thought maybe that
>>>> this is related to doing the same thing twice (windowing and then the sink
>>>> does "window" it again):
>>>>
>>>> pipeline
>>>> .apply(
>>>> KafkaIO.read()
>>>> .withBootstrapServers(bootstrap)
>>>> .withTopics(topics)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>> .updateConsumerProperties(
>>>>
>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>> inputMessagesConfig))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>>> groupId))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>> .withReadCommitted()
>>>> .withTimestampPolicyFactory(withEventTs)
>>>> .commitOffsetsInFinalize())
>>>> .apply(ParDo.of(new ToEventFn()))
>>>>
>>>> // DELETE the following up to
>>>> .apply(
>>>> Window.into(new ZurichTimePartitioningWindowFn())
>>>>
>>>> .triggering(
>>>> Repeatedly.forever(
>>>> AfterFirst.of(
>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>> AfterProcessingTime.pastFirstElementInPane()
>>>> .plusDelayOf(refreshFrequency
>>>> .withAllowedLateness(Duration.standardDays(14))
>>>> .discardingFiredPanes())
>>>> // HERE - END of DELETION
>>>>
>>>> .apply(
>>>> BigQueryIO.write()
>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>> .withTriggeringFrequency(refreshFrequency)
>>>> .withNumFileShards(1)
>>>> .to(partitionedTableDynamicDestinations)
>>>> .withFormatFunction(
>>>> (SerializableFunction)
>>>> KafkaToBigQuery::convertUserEventToTableRow)
>>>>
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>
>>>> pipeline.run().waitUntilFinish();
>>>>
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> [0] http://streamingsystems.net/
>>>>
>>>
>>>
>>> --
>>>
>>&g

Re: No checkpoints for KafkaIO to BigQueryIO pipeline on Flink runner?

2019-01-28 Thread Kaymak, Tobias
oundedSource - Reader-0:
first record offset 3014


Best,
Tobi


On Mon, Jan 28, 2019 at 11:52 AM Maximilian Michels  wrote:

> Hi Tobias,
>
> Checkpointing should be enabled when you set it in the Flink config or via
> the
> Beam option `checkpointingInterval`. Did you set `runner` to `FlinkRunner`?
>
> If possible, could you share parts of the Flink logs?
>
> Thanks,
> Max
>
> On 25.01.19 15:14, Kaymak, Tobias wrote:
> > Hi,
> >
> > I am trying to migrate my existing KafkaToGCS pipeline to a
> KafkaToBigQuery
> > pipeline to skip the loading step from GCS which is currently handled
> externally
> > from Beam.
> >
> > I noticed that the pipeline, written in Beam 2.9.0 (Java) does not
> trigger any
> > checkpoint on Flink (1.5.5), even though its configured to do so when I
> launch
> > it. Is this normal? How does Beam then guarantee exactly once when there
> are no
> > checkpoints in Flink? (It seems to start from scratch when it crashes,
> during my
> > tests, but I am not 100% sure)
> >
> >
> > This is my pipeline:
> >
> >   pipeline
> >  .apply(
> >  KafkaIO.read()
> >  .withBootstrapServers(bootstrap)
> >  .withTopics(topics)
> >  .withKeyDeserializer(StringDeserializer.class)
> >  .withValueDeserializer(ConfigurableDeserializer.class)
> >  .updateConsumerProperties(
> >
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> > inputMessagesConfig))
> >
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> > "earliest"))
> >  .updateConsumerProperties(ImmutableMap.of("group.id
> > <http://group.id>", groupId))
> >
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> > "true"))
> >  .withReadCommitted()
> >  .withTimestampPolicyFactory(withEventTs)
> >  .commitOffsetsInFinalize())
> >  .apply(ParDo.of(new ToEventFn()))
> >  .apply(
> >  Window.into(new ZurichTimePartitioningWindowFn())
> >
> >  .triggering(
> >  Repeatedly.forever(
> >  AfterFirst.of(
> >  AfterPane.elementCountAtLeast(bundleSize),
> >  AfterProcessingTime.pastFirstElementInPane()
> >  .plusDelayOf(refreshFrequency
> >  .withAllowedLateness(Duration.standardDays(14))
> >  .discardingFiredPanes())
> >  .apply(
> >  BigQueryIO.write()
> >  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> >  .withTriggeringFrequency(refreshFrequency)
> >  .withNumFileShards(1)
> >  .to(partitionedTableDynamicDestinations)
> >  .withFormatFunction(
> >  (SerializableFunction)
> >  KafkaToBigQuery::convertUserEventToTableRow)
> >
> >
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> >
> > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
> >
> >  pipeline.run().waitUntilFinish();
> > It's launched like the other (GCS) one via:
> >
> > ...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=30
> > --parallelism=1 --tempLocation=gs://foo..
> >
> > Any idea why checkpointing does not work here?
> >
> > Best,
> > Tobias
>


-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


Re: Is a window function necessary when using a KafkaIO source and a BigQueryIO sink?

2019-01-28 Thread Kaymak, Tobias
Yes I am making use of partitioned tables, that's why I was wondering if
the windowing step could be skipped. :)

Cheers
Tobi

On Mon, Jan 28, 2019 at 10:43 AM Reza Rokni  wrote:

> My apologies Tobi too quick to hit the send button :-(
>
> I was checking to ask if you had also looked at partition tables in
> BigQuery, assuming the only partitioning you are doing is by Day.
>
> Cheers
> Reza
>
> On Mon, 28 Jan 2019 at 17:22, Reza Rokni  wrote:
>
>> Hi Tobi,
>>
>> Are you making use of partitioned tables in BigQuery or shard tables?
>>
>> https://cloud.google.com/bigquery/docs/partitioned-tables
>>
>> Cheers
>>
>> Reza
>>
>>
>> On Mon, 28 Jan 2019 at 17:11, Kaymak, Tobias 
>> wrote:
>>
>>> I was spending some time with the "Streaming Systems" [0] book over
>>> the weekend and I thought that my pipeline might be doing something "too
>>> much" as the BigQuery sink already should partition the data by day and put
>>> it in the right place - so can my windowing function in the following
>>> pipeline be left out?
>>>
>>> I am asking this since sometimes I miss an element at the very edge of a
>>> window compared to a pipeline with a GCS sink and I thought maybe that this
>>> is related to doing the same thing twice (windowing and then the sink does
>>> "window" it again):
>>>
>>> pipeline
>>> .apply(
>>> KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>>
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> groupId))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>> .withReadCommitted()
>>> .withTimestampPolicyFactory(withEventTs)
>>> .commitOffsetsInFinalize())
>>> .apply(ParDo.of(new ToEventFn()))
>>>
>>> // DELETE the following up to
>>> .apply(
>>> Window.into(new ZurichTimePartitioningWindowFn())
>>>
>>> .triggering(
>>> Repeatedly.forever(
>>> AfterFirst.of(
>>> AfterPane.elementCountAtLeast(bundleSize),
>>> AfterProcessingTime.pastFirstElementInPane()
>>> .plusDelayOf(refreshFrequency
>>> .withAllowedLateness(Duration.standardDays(14))
>>> .discardingFiredPanes())
>>> // HERE - END of DELETION
>>>
>>> .apply(
>>> BigQueryIO.write()
>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>> .withTriggeringFrequency(refreshFrequency)
>>> .withNumFileShards(1)
>>> .to(partitionedTableDynamicDestinations)
>>> .withFormatFunction(
>>> (SerializableFunction)
>>> KafkaToBigQuery::convertUserEventToTableRow)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>> pipeline.run().waitUntilFinish();
>>>
>>>
>>> Best,
>>> Tobi
>>>
>>> [0] http://streamingsystems.net/
>>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


Is a window function necessary when using a KafkaIO source and a BigQueryIO sink?

2019-01-28 Thread Kaymak, Tobias
I was spending some time with the "Streaming Systems" [0] book over
the weekend and I thought that my pipeline might be doing something "too
much" as the BigQuery sink already should partition the data by day and put
it in the right place - so can my windowing function in the following
pipeline be left out?

I am asking this since sometimes I miss an element at the very edge of a
window compared to a pipeline with a GCS sink and I thought maybe that this
is related to doing the same thing twice (windowing and then the sink does
"window" it again):

pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))

// DELETE the following up to
.apply(
Window.into(new ZurichTimePartitioningWindowFn())

.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
// HERE - END of DELETION

.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

pipeline.run().waitUntilFinish();


Best,
Tobi

[0] http://streamingsystems.net/


No checkpoints for KafkaIO to BigQueryIO pipeline on Flink runner?

2019-01-25 Thread Kaymak, Tobias
Hi,

I am trying to migrate my existing KafkaToGCS pipeline to a KafkaToBigQuery
pipeline to skip the loading step from GCS which is currently handled
externally from Beam.

I noticed that the pipeline, written in Beam 2.9.0 (Java) does not trigger
any checkpoint on Flink (1.5.5), even though its configured to do so when I
launch it. Is this normal? How does Beam then guarantee exactly once when
there are no checkpoints in Flink? (It seems to start from scratch when it
crashes, during my tests, but I am not 100% sure)


This is my pipeline:

 pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
groupId))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
.apply(
Window.into(new ZurichTimePartitioningWindowFn())

.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(bundleSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(refreshFrequency
.withAllowedLateness(Duration.standardDays(14))
.discardingFiredPanes())
.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

pipeline.run().waitUntilFinish();

It's launched like the other (GCS) one via:

...--checkpointingMode=EXACTLY_ONCE --checkpointingInterval=30
--parallelism=1 --tempLocation=gs://foo..

Any idea why checkpointing does not work here?

Best,
Tobias


Re: Beam Metrics using FlinkRunner

2018-12-07 Thread Kaymak, Tobias
I am using the Flink Prometheus connector and I get metrics in Prometheus
for my running pipelines. - I am not looking at metrics in the Flink
dashboard directly.
Have you tried that? (Talk with background and example:
https://github.com/mbode/flink-prometheus-example)

On Tue, Dec 4, 2018 at 7:49 PM Phil Franklin 
wrote:

> I’m having difficulty accessing Beam metrics when using FlinkRunner in
> streaming mode. I don’t get any metrics from MetricsPusher, though the same
> setup delivered metrics from SparkRunner.  Probably for the same reason
> that MetricsPusher doesn’t work, I also don’t get any output when I call an
> instance of MetricsHttpSink directly.  The problem seems to be that Flink
> never returns from pipeline.run(), an issue that others have referred to as
> FlinkRunner hanging.
>
> Is there a solution for getting metrics in this case that I’m missing?
>
> Thanks!
> -Phil



-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


Re: Experience with KafkaIO -> BigQueryIO

2018-11-09 Thread Kaymak, Tobias
On Fri, Nov 9, 2018 at 1:23 AM Raghu Angadi  wrote:

>
>>> That is fine. We can ignore the timestamp as possible suspect for
>>> debugging this. Using custom timestamps from records is normal.
>>>
>>>
>> Could you clarify of what you meant with "withTimestampFn() was never
>> meant to
>> be public"? I am using it to attach the right timestamp to an element to
>> be
>> able to window into days with the local time zone in the windowing
>> function. If
>> this is not used in the correct way could you tell me how I can do it
>> better?
>>
>
> The problem with watermark part of the policy. A source needs to provide
> both a timestamp for a record as well as a watermark for the stream. A
> TimestampPolicy in KafkaIO is responsible for both of these for each Kafka
> partition.
>
> `withTimestampFn()` lets user provide a function to extract timestamp. But
> for watermark, it just uses most recent record's timestamp. Say record A
> has timestamp 9:00:01 and arrives at 9:00:05, and B has a timestamp of
> 8:59:58 and arrives at 9:00:15.
> That implies once is A is processed at 9:00:05, your pipelines watermark
> could jump to 9:00:01, that implies a hourly window for [8:00, 9:00) will
> close. When B arrives 10 seconds later, it would be considered late. The
> problem is not just that such watermark policy is too brittle, it is the
> fact that users have no idea that is happening.
>
> Deprecation documentation for this API[1] suggests using
> `CustomTimestampPolicyWithLimitedDelay()` [2] in stead.
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L100
> [2]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L29
>
>
Thank you so much, that explanation was very helpful! I will adapt to the
new approach and continue digging into the missing partition's problem
afterwards.
Tobi.



> Raghu.
>
> After the rollback I am busy making the existing pipeline to GCS so robust
>> that
>> it never fails to deliver all files so that there is always a backup. As
>> I am
>> under a lot of pressure right now I don't want to fuck it up with
>> easy-to-avoid
>> mistakes and the GCS pipeline has the same logic, but just a different
>> sink
>> that uses a FileIO to write out different days to different "folders".
>>
>> Thank you,
>>
>> Tobi
>>
>>
>>
>>> Raghu.
>>>
>>>
>>>> I could also not fiddle with the timestamp at all and let the system
>>>> decide and
>>>> then in the BigQuery.IO partitioning step parse it and assign it to a
>>>> partition. Is this better?
>>>>
>>>>
>>>>
>>>>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am sharing my experience with you after trying to use the following
>>>>>> pipeline
>>>>>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>>>>>
>>>>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed
>>>>>> element
>>>>>> 2. Filtering bad records
>>>>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch
>>>>>> jobs)
>>>>>> every 15 minutes
>>>>>>
>>>>>> I had a working pipeline that does not write to BigQuery directly,
>>>>>> but to
>>>>>> Cloud Storage, so it's 3rd step was
>>>>>>
>>>>>> 3. Writing files to GCS in daily "subdirectories"
>>>>>>
>>>>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's
>>>>>> state
>>>>>> should no longer be tied to thinking about what to delete on GCS, also
>>>>>> configurable refresh times directly from within the Pipeline was
>>>>>> something I
>>>>>> was looking for. The thing that I needed to change was the output in
>>>>>> the end,
>>>>>> so knew my parsing logic would not change and that should reduce the
>>>>>> risk.
>>>>>>
>>>>>> I tested the pipeline within our testcluster and it looked promising.
>>>>>> When I
>>>>>> 

Re: Experience with KafkaIO -> BigQueryIO

2018-11-08 Thread Kaymak, Tobias
On Wed, Nov 7, 2018 at 6:49 PM Raghu Angadi  wrote:

>
> On Wed, Nov 7, 2018 at 5:04 AM Kaymak, Tobias 
> wrote:
>
>>
>> On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi  wrote:
>>
>>> You seem to be reading from multiple topics and your timestamp policy is
>>> too simplistic (withTimestampFn() was never meant to be public API, I am
>>> sending a PR to deprecate it first and then will make it package private).
>>> So if you have two topics of different sizes, smaller topic might be
>>> consumed really fast pushing your watermark way ahead. This might or might
>>> be happening, but this is one of the dangers of using record timestamp for
>>> watermark (we should never do that).
>>>
>>>
>> To clarify: Test was done consuming from a single topic. I am using a
>> field
>> inside the element's JSON to get the element's timestamp. Data in a topic
>> can
>> go way back to let's say 2017, but that data was pushed to Kafka in one
>> go and
>> the timestamp when it arrived is for example wednesday last week.
>> Sometimes the
>> producing side does not set the element's timestamp for Kafka (since it's
>> using
>> a library that does not support that yet), so it has to be parsed.
>>
>
> That is fine. We can ignore the timestamp as possible suspect for
> debugging this. Using custom timestamps from records is normal.
>
>
Could you clarify of what you meant with "withTimestampFn() was never meant
to
be public"? I am using it to attach the right timestamp to an element to be
able to window into days with the local time zone in the windowing
function. If
this is not used in the correct way could you tell me how I can do it
better?

After the rollback I am busy making the existing pipeline to GCS so robust
that
it never fails to deliver all files so that there is always a backup. As I
am
under a lot of pressure right now I don't want to fuck it up with
easy-to-avoid
mistakes and the GCS pipeline has the same logic, but just a different sink
that uses a FileIO to write out different days to different "folders".

Thank you,

Tobi



> Raghu.
>
>
>> I could also not fiddle with the timestamp at all and let the system
>> decide and
>> then in the BigQuery.IO partitioning step parse it and assign it to a
>> partition. Is this better?
>>
>>
>>
>>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am sharing my experience with you after trying to use the following
>>>> pipeline
>>>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>>>
>>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>>>> 2. Filtering bad records
>>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch
>>>> jobs)
>>>> every 15 minutes
>>>>
>>>> I had a working pipeline that does not write to BigQuery directly, but
>>>> to
>>>> Cloud Storage, so it's 3rd step was
>>>>
>>>> 3. Writing files to GCS in daily "subdirectories"
>>>>
>>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's
>>>> state
>>>> should no longer be tied to thinking about what to delete on GCS, also
>>>> configurable refresh times directly from within the Pipeline was
>>>> something I
>>>> was looking for. The thing that I needed to change was the output in
>>>> the end,
>>>> so knew my parsing logic would not change and that should reduce the
>>>> risk.
>>>>
>>>> I tested the pipeline within our testcluster and it looked promising.
>>>> When I
>>>> deployed it last week everything seemed to go smoothly. On Friday I
>>>> noticed
>>>> that I had holes in the data: in the BigQuery tables there were missing
>>>> days
>>>> (tricky was that the recent days looked fine). (To be sure I reset the
>>>> pipeline
>>>> and read from the very beginning of each topic from Kafka. Within
>>>> different
>>>> runs, different days were missing.) I spent the weekend rolling back the
>>>> changes and trying to figure out what was going on.
>>>>
>>>> I didn't see any error in the logs (the log level was on WARNING for
>>>> most
>>>> parts), but I thought, well maybe it's because there are too many
>>>> partitions
>&g

Re: Experience with KafkaIO -> BigQueryIO

2018-11-07 Thread Kaymak, Tobias
On Tue, Nov 6, 2018 at 6:06 PM Lukasz Cwik  wrote:

> Since your using FILE_LOADS as the BigQueryIO write method, did you see a
> file being created for each partitioned table based upon the requested
> triggering frequency?
>
> Figuring out whether the problem was upstream from creating the file or
> downstream after the file was created would help debug this issue.
>
>
That is a very good point! I have to admit that it's not so easy for me to
figure out postmortem if this happened for each one. As every load job I can
see in the logs was successful, I need to dig through the temporary file
structure in the GCS bucket used by BigQuery.IO before loading the tables,
but
that is quite a challenge.



> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias 
> wrote:
>
>> Hi,
>>
>> I am sharing my experience with you after trying to use the following
>> pipeline
>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>
>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>> 2. Filtering bad records
>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
>> every 15 minutes
>>
>> I had a working pipeline that does not write to BigQuery directly, but to
>> Cloud Storage, so it's 3rd step was
>>
>> 3. Writing files to GCS in daily "subdirectories"
>>
>> I tried to rewrite the pipeline to reduce complexity: Resetting it's state
>> should no longer be tied to thinking about what to delete on GCS, also
>> configurable refresh times directly from within the Pipeline was
>> something I
>> was looking for. The thing that I needed to change was the output in the
>> end,
>> so knew my parsing logic would not change and that should reduce the
>> risk.
>>
>> I tested the pipeline within our testcluster and it looked promising.
>> When I
>> deployed it last week everything seemed to go smoothly. On Friday I
>> noticed
>> that I had holes in the data: in the BigQuery tables there were missing
>> days
>> (tricky was that the recent days looked fine). (To be sure I reset the
>> pipeline
>> and read from the very beginning of each topic from Kafka. Within
>> different
>> runs, different days were missing.) I spent the weekend rolling back the
>> changes and trying to figure out what was going on.
>>
>> I didn't see any error in the logs (the log level was on WARNING for most
>> parts), but I thought, well maybe it's because there are too many
>> partitions
>> and BigQuery has a limit of 1000 partition operations per day. So I
>> started
>> reading from just 90 days in the past, but I still had holes (whole days).
>>
>> I had a windowing step that I needed for the GCS pipeline, I became aware
>> that I
>> wouldn't need this anymore with BigQueryIO so I commented it out and
>> tested
>> again, without luck.
>>
>> What struck me was that the Flink Cluster didn't do any checkpoints for
>> the
>> pipeline that was using BigQueryIO - it does so when writing to GCS and I
>> tested it's failure logic there. Additionally the graph in Flink with
>> BigQueryIO becomes very complex, but this is something I expected.
>>
>> Here is the Pipeline code with the commented out windowing part:
>>
>>   pipeline
>> .apply(
>> KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>>
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "di-beam-consumers"))
>>
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>> .withTimestampPolicyFactory(
>> TimestampPolicyFactory.withTimestampFn(
>> new
>> MessageTimestampExtractor(inputMessagesConfig)))
>> .withReadCommitted()
>> .commitOffsetsInFinalize())
>> .apply(ParDo.of(new ToEventFn()))
>> //.apply(
>> //Window.into(new ZurichTimePartitioningWindowFn())
>> //.triggering(
>> //   

Re: Experience with KafkaIO -> BigQueryIO

2018-11-07 Thread Kaymak, Tobias
On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi  wrote:

> You seem to be reading from multiple topics and your timestamp policy is
> too simplistic (withTimestampFn() was never meant to be public API, I am
> sending a PR to deprecate it first and then will make it package private).
> So if you have two topics of different sizes, smaller topic might be
> consumed really fast pushing your watermark way ahead. This might or might
> be happening, but this is one of the dangers of using record timestamp for
> watermark (we should never do that).
>
>
To clarify: Test was done consuming from a single topic. I am using a field
inside the element's JSON to get the element's timestamp. Data in a topic
can
go way back to let's say 2017, but that data was pushed to Kafka in one go
and
the timestamp when it arrived is for example wednesday last week. Sometimes
the
producing side does not set the element's timestamp for Kafka (since it's
using
a library that does not support that yet), so it has to be parsed.

I could also not fiddle with the timestamp at all and let the system decide
and
then in the BigQuery.IO partitioning step parse it and assign it to a
partition. Is this better?



> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias 
> wrote:
>
>> Hi,
>>
>> I am sharing my experience with you after trying to use the following
>> pipeline
>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>
>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>> 2. Filtering bad records
>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
>> every 15 minutes
>>
>> I had a working pipeline that does not write to BigQuery directly, but to
>> Cloud Storage, so it's 3rd step was
>>
>> 3. Writing files to GCS in daily "subdirectories"
>>
>> I tried to rewrite the pipeline to reduce complexity: Resetting it's state
>> should no longer be tied to thinking about what to delete on GCS, also
>> configurable refresh times directly from within the Pipeline was
>> something I
>> was looking for. The thing that I needed to change was the output in the
>> end,
>> so knew my parsing logic would not change and that should reduce the
>> risk.
>>
>> I tested the pipeline within our testcluster and it looked promising.
>> When I
>> deployed it last week everything seemed to go smoothly. On Friday I
>> noticed
>> that I had holes in the data: in the BigQuery tables there were missing
>> days
>> (tricky was that the recent days looked fine). (To be sure I reset the
>> pipeline
>> and read from the very beginning of each topic from Kafka. Within
>> different
>> runs, different days were missing.) I spent the weekend rolling back the
>> changes and trying to figure out what was going on.
>>
>> I didn't see any error in the logs (the log level was on WARNING for most
>> parts), but I thought, well maybe it's because there are too many
>> partitions
>> and BigQuery has a limit of 1000 partition operations per day. So I
>> started
>> reading from just 90 days in the past, but I still had holes (whole days).
>>
>> I had a windowing step that I needed for the GCS pipeline, I became aware
>> that I
>> wouldn't need this anymore with BigQueryIO so I commented it out and
>> tested
>> again, without luck.
>>
>> What struck me was that the Flink Cluster didn't do any checkpoints for
>> the
>> pipeline that was using BigQueryIO - it does so when writing to GCS and I
>> tested it's failure logic there. Additionally the graph in Flink with
>> BigQueryIO becomes very complex, but this is something I expected.
>>
>> Here is the Pipeline code with the commented out windowing part:
>>
>>   pipeline
>> .apply(
>> KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>>
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "di-beam-consumers"))
>>
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>

Experience with KafkaIO -> BigQueryIO

2018-11-06 Thread Kaymak, Tobias
Hi,

I am sharing my experience with you after trying to use the following
pipeline
logic (with Beam 2.6.0 - running on Flink 1.5):

1. Reading from KafkaIO, attaching a timestamp from each parsed element
2. Filtering bad records
3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
every 15 minutes

I had a working pipeline that does not write to BigQuery directly, but to
Cloud Storage, so it's 3rd step was

3. Writing files to GCS in daily "subdirectories"

I tried to rewrite the pipeline to reduce complexity: Resetting it's state
should no longer be tied to thinking about what to delete on GCS, also
configurable refresh times directly from within the Pipeline was something I
was looking for. The thing that I needed to change was the output in the
end,
so knew my parsing logic would not change and that should reduce the risk.

I tested the pipeline within our testcluster and it looked promising. When I
deployed it last week everything seemed to go smoothly. On Friday I noticed
that I had holes in the data: in the BigQuery tables there were missing days
(tricky was that the recent days looked fine). (To be sure I reset the
pipeline
and read from the very beginning of each topic from Kafka. Within different
runs, different days were missing.) I spent the weekend rolling back the
changes and trying to figure out what was going on.

I didn't see any error in the logs (the log level was on WARNING for most
parts), but I thought, well maybe it's because there are too many partitions
and BigQuery has a limit of 1000 partition operations per day. So I started
reading from just 90 days in the past, but I still had holes (whole days).

I had a windowing step that I needed for the GCS pipeline, I became aware
that I
wouldn't need this anymore with BigQueryIO so I commented it out and tested
again, without luck.

What struck me was that the Flink Cluster didn't do any checkpoints for the
pipeline that was using BigQueryIO - it does so when writing to GCS and I
tested it's failure logic there. Additionally the graph in Flink with
BigQueryIO becomes very complex, but this is something I expected.

Here is the Pipeline code with the commented out windowing part:

  pipeline
.apply(
KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(

ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id",
"di-beam-consumers"))

.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withTimestampPolicyFactory(
TimestampPolicyFactory.withTimestampFn(
new MessageTimestampExtractor(inputMessagesConfig)))
.withReadCommitted()
.commitOffsetsInFinalize())
.apply(ParDo.of(new ToEventFn()))
//.apply(
//Window.into(new ZurichTimePartitioningWindowFn())
//.triggering(
//Repeatedly.forever(
//AfterFirst.of(
//
AfterPane.elementCountAtLeast(bundleSize),
//
AfterProcessingTime.pastFirstElementInPane()
//.plusDelayOf(refreshFrequency
//.withAllowedLateness(Duration.standardDays(1))
//.discardingFiredPanes())
.apply(
BigQueryIO.write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(refreshFrequency)
.withNumFileShards(1)
.to(partitionedTableDynamicDestinations)
.withFormatFunction(
(SerializableFunction)
KafkaToBigQuery::convertUserEventToTableRow)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));


I have the feeling that I must make some serious and dumb mistakes as I know
the Beam framework is very robust. Thanks for taking the time to read this.

Tobi


Re: KafkaIO - Deadletter output

2018-10-29 Thread Kaymak, Tobias
t 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>>>>>>
>>>>>>> Yes, that would be fine.
>>>>>>>
>>>>>>> The user could then use a ParDo which outputs to a DLQ for things it
>>>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for 
>>>>>>> everything
>>>>>>> else.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>> application fine is with what it means)?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> All records in Apache Beam have a timestamp. The default timestamp
>>>>>>>>> is the min timestamp defined here:
>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You would have to return min timestamp for all records otherwise
>>>>>>>>>>> the watermark may have advanced and you would be outputting records 
>>>>>>>>>>> that
>>>>>>>>>>> are droppably late.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>>>>>>>>>>>>> API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>&

KafkaIO - Deadletter output

2018-10-23 Thread Kaymak, Tobias
Hi,

Is there a way to get a Deadletter Output from a pipeline that uses a
KafkaIO
connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
takes
only a SerializableFunction and not a ParDo, how would I be able to produce
a
Deadletter output from it?

I have the following pipeline defined that reads from a KafkaIO input:

pipeline.apply(
  KafkaIO.read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id", "beam-consumers"))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withTimestampPolicyFactory(
TimestampPolicyFactory.withTimestampFn(
new MessageTimestampExtractor(inputMessagesConfig)))
.withReadCommitted()
.commitOffsetsInFinalize())


and I like to get deadletter outputs when my timestamp extraction fails.

Best,
Tobi


Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Kaymak, Tobias
Hi Wout,

you are so right - I forgot the --tempLocation= parameter when launching
and after that I also needed to set the number of shards by adding:
.withNumFileShards(1)
Thank you!

Tobi

On Wed, Oct 10, 2018 at 3:23 PM Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey Tobias,
>
>
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
>
>
> points to the following code snippet (starting from BatchLoads.java:210) :
>
> if (bigQueryServices == null) {
>   try {
> GcsPath.fromUri(tempLocation);
>   } catch (IllegalArgumentException e) {
> throw new IllegalArgumentException(
> String.format(
> "BigQuery temp location expected a valid 'gs://' path, but was 
> given '%s'",
> tempLocation),
> e);
>   }
> }
>
>
>
> are you sure your templocation is set correctly? I guess it’s needed for 
> staging a bigquery load job instead of streaming.
>
>
>
> Wout
>
>
>
>
>
>
>
> *From: *"Kaymak, Tobias" 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, 10 October 2018 at 14:18
> *To: *"user@beam.apache.org" 
> *Subject: *How to use of BigQueryIO Method.FILE_LOADS when reading from a
> unbounded source?
>
>
>
> I am trying to read from an unbounded source and using FILE_LOADS instead
> of streaming inserts towards BigQuery.
>
> If I don't have the following two lines
>
>
>
>  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
>   .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
> my code works just fine, but uses streaming inserts. If I add them I get a
> non-specific stacktrace like:
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException
>
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
>
>
>
> where line 181 is the first line of the following code excerpt:
>
>
>
> BigQueryIO.write()
>
>   .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
>   .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
>   .to(
>
>   new DynamicDestinations() {
>
> @Override
>
> public String getDestination(ValueInSingleWindow element) {
>
>   return element.getValue().getTopic();
>
> }
>
>
>
> @Override
>
> public TableDestination getTable(String destination) {
>
>   return new TableDestination(
>
>   "charged-dialect-824:KafkaStaging.test", null, new
> TimePartitioning().setType("DAY"));
>
> }
>
>
>
> @Override
>
> public TableSchema getSchema(String destination) {
>
>   return inputMessagesConfig.getTableSchema(destination);
>
> }
>
>   })
>
>   .withFormatFunction(
>
>   (SerializableFunction)
>
>   event -> convertUserEventToTableRow(event))
>
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
>   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>
>
>
> I am not sure what I am doing wrong here, I tried higher values for the
> Duration, but that didn't help. I wasn't able to find the root
>
> cause for the exception with the debugger, any idea how I can get to the
> bottom of this?
>
> Tobi
>
>
>


-- 
Tobias Kaymak
Data Engineer

tobias.kay...@ricardo.ch
www.ricardo.ch


How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?

2018-10-10 Thread Kaymak, Tobias
I am trying to read from an unbounded source and using FILE_LOADS instead
of streaming inserts towards BigQuery.

If I don't have the following two lines

 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

my code works just fine, but uses streaming inserts. If I add them I get a
non-specific stacktrace like:

Exception in thread "main" java.lang.IllegalArgumentException
at
com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)

where line 181 is the first line of the following code excerpt:

BigQueryIO.write()
  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

  .to(
  new DynamicDestinations() {
@Override
public String getDestination(ValueInSingleWindow element) {
  return element.getValue().getTopic();
}

@Override
public TableDestination getTable(String destination) {
  return new TableDestination(
  "charged-dialect-824:KafkaStaging.test", null, new
TimePartitioning().setType("DAY"));
}

@Override
public TableSchema getSchema(String destination) {
  return inputMessagesConfig.getTableSchema(destination);
}
  })
  .withFormatFunction(
  (SerializableFunction)
  event -> convertUserEventToTableRow(event))

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I am not sure what I am doing wrong here, I tried higher values for the
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the
bottom of this?

Tobi