Re: PubsubUnboundedSink parameters on Dataflow runner

2020-05-19 Thread Marcin Kuthan
On Tue, 19 May 2020 at 17:40, Reuven Lax  wrote:

>
> On Tue, May 19, 2020 at 4:14 AM Marcin Kuthan 
> wrote:
>
>> I'm looking for the Pubsub publication details on unbounded collections
>> when Dataflow runner is used and streaming engine is on.
>>
>> As I understood correctly the PubsubUnboundedSink transform is overridden
>> by internal implementation.
>>
>>
>> https://lists.apache.org/thread.html/26e2bfdb6eaa7319ea3cc65f9d8a0bfeb7be6a6d88f0167ebad0591d%40%3Cuser.beam.apache.org%3E
>>
>
> Only the streaming runner.
>

Indeed! After your response I
checked "org.apache.beam.runners.dataflow.DataflowRunner" again and the
internal implementation is used only if streaming is on. Thanks for
pointing that out.


>
>>
>>
>> Questions:
>>
>> 1. Should I expect that parameters maxBatchByteSize, batchSize are
>> respected, or Dataflow internal implementation just ignores them?
>>
>
> I don't think that Dataflow pays attention to this.
>

It's very tricky to figure out where is a boundary between Apache Beam and
Dataflow Runner. Parameters like maxBatchSize and batch size look like
regular Apache Beam tuning knobs. Are you aware of any Dataflow
documentation where I could find more information how Dataflow handle
Apache Beam calls, what is important and what is not?

>
>
>> 4. How to estimate streaming engine costs for internal shuffling in
>> PubsubUnboundedSink, if any? The default PubsubUnboundedSink implementation
>> shuffles data before publication but I don't know how how it is done by
>> internal implementation. And I don't need to know, as long as it does not
>> generate extra costs :)
>>
>
> The internal implementation does not add any extra cost. Dataflow charges
> for every MB read from Streaming Engine as a "shuffle" charge, and this
> includes the records read from PubSub. The external Beam implementation
> includes a full shuffle, which would be more expensive as it includes both
> a write and a read.
>
>
Very interesting! Please correct me if I'm wrong with my understanding:

1. Worker harness does not pull data directly from the Pubsub. Data is
effectively pulled by streaming engine and the worker harness reads the
data from the streaming engine. Who is responsible for ack-ing the messages?
2. Worker harness does not publish data directly to the Pubsub because the
data from the previous pipeline step is already in the streaming engine.
The data is published to pubsub directly by the streaming engine.

Thank you Reuven for your time and for sharing Dataflow and Streaming
Engine details.

>


Webinar 3: An introduction to the Spark runner for Apache Beam

2020-05-19 Thread Aizhamal Nurmamat kyzy
Hi again!

As it is becoming a tradition for this month, I am sending you a reminder
about tomorrow's webinar on 'Introduction to the Spark Runner for Apache
Beam' by our dear Ismael Mejia. We start tomorrow at 10:00am PST/ 5:00pm
GMT/1:00pm EST.

You can join by signing up here:
https://learn.xnextcon.com/event/eventdetails/W20052010

If you cannot get into the meeting room on Zoom, you can go to this  Youtube

channel
for the same livestream, but we encourage attendees to join Zoom to be able
to ask speakers questions.

The webinar will be recorded and posted on Beam's YT channel later on Wed,
and all the resources used during the presentation will be shared on this
repo: https://github.com/aijamalnk/beam-learning-month/blob/master/README.md

Thanks,
Aizhama


Best approach for Sql queries in Beam

2020-05-19 Thread bharghavi vajrala
Hi All,

Need your inputs on below scenario:

Source : Kafka (Actual source is oracle db, data is pushed to kafka)
SDK :  Java
Runner : Flink 
Problem: Subscribe to 5 topics(tables) join with different keys, Group by based 
on few columns.
Existing solution: Using session window of 20 seconds having different 
transform for every 2 queries and using the result.

Below is the sample code:

Sessions sessionWindow = 
Sessions.withGapDuration(Duration.standardSeconds((long) 
Integer.parseInt("20")));

  PCollection stream1 =
PCollectionTuple
.of(new TupleTag<>("TABLE1"), rows1)
.and(new TupleTag<>("TABLE2"), rows2)
.apply("rows1-rows2", SqlTransform.query(
"select  t1.col1,t1.col2,t2.col5 from "
"TABLE1 t1  join TABLE2 t2 \n" +
"on t1.col5 = t2.col7 "
)
)
.apply("window" , Window.into(sessionWindow));

   PCollection mergedStream =
PCollectionTuple
.of(new TupleTag<>("MERGE-TABLE"), stream1)
.apply("merge" , SqlTransform.query("select col1,col2, 
\n" +
"max(case when col3='D' then col8   end) as 
D_col3,\n" +
"max(case when col3='C' then col8   end) as 
C_col3,\n" +
"max(case when col6='CP' then col10   end) as 
CP_col6,\n" +
"max(case when col6='DP' then col10   end) as 
DP_col6\n" +
"from MERGE-TABLE " +
"group by  col1,col2\n "
)).apply("merge-window", Window.into(sessionWindow));

 PCollection stream2 =
PCollectionTuple
.of(new TupleTag<>("TABLE3"), mergedStream)
.and(new TupleTag<>("TABLE4"), stream22)
.apply( 
 SqlTransform.query("select 
distinct c1,c2,c4 from   " +
"TABLE3 d1 join 
TABLE4  d2\n" +
" on  d1.num= d2.tr_num  "))
.apply("e-window" , Window.into(sessionWindow));


Is there any better approach?

Looking forward for suggestions.

Thanks!!



Re: PubsubUnboundedSink parameters on Dataflow runner

2020-05-19 Thread Reuven Lax
On Tue, May 19, 2020 at 4:14 AM Marcin Kuthan 
wrote:

> I'm looking for the Pubsub publication details on unbounded collections
> when Dataflow runner is used and streaming engine is on.
>
> As I understood correctly the PubsubUnboundedSink transform is overridden
> by internal implementation.
>
>
> https://lists.apache.org/thread.html/26e2bfdb6eaa7319ea3cc65f9d8a0bfeb7be6a6d88f0167ebad0591d%40%3Cuser.beam.apache.org%3E
>

Only the streaming runner.


>
>
> Questions:
>
> 1. Should I expect that parameters maxBatchByteSize, batchSize are
> respected, or Dataflow internal implementation just ignores them?
>

I don't think that Dataflow pays attention to this.


> 2. What about pubsubClientFactory? The default one is
> PubsubJsonClientFactory, and this is somehow important if I want to
> configure maxBatchByteSize under Pubsub 10MB limit. Json factory encodes
> messages using base64, so the limit should be lowered to 10MB * 0.75 (minus
> some safety margin).
>

Similarly, this has no meaning when using Dataflow streaming.


> 3. Should I expect any differences for bounded and unbounded collections?
> There are different defaults in the Beam code: e.g: maxBatchByteSize is
> ~7.5MB for bounded and ~400kB for unbounded collections, batchSize is 100
> for bounded, and 1000 for unbounded. I also don't understand the reasons
> behind default settings.
> 4. How to estimate streaming engine costs for internal shuffling in
> PubsubUnboundedSink, if any? The default PubsubUnboundedSink implementation
> shuffles data before publication but I don't know how how it is done by
> internal implementation. And I don't need to know, as long as it does not
> generate extra costs :)
>

The internal implementation does not add any extra cost. Dataflow charges
for every MB read from Streaming Engine as a "shuffle" charge, and this
includes the records read from PubSub. The external Beam implementation
includes a full shuffle, which would be more expensive as it includes both
a write and a read.



> Many questions about Dataflow internals but it would be nice to know some
> details, the details important from the performance and costs perspective.
>
> Thanks,
> Marcin
>


Re: How Beam coders match with runner serialization

2020-05-19 Thread Ivan San Jose
Ok, sorry but now I'm totally lost...

Then, as you said "Beam coders are wrapped in Flink's TypeSerializers",
so Beam is not doing anything about serialization/deserialization, is
leveraging this to Flink TypeSerializers, right?

Now let me explain my problem better...

We have a Beam pipeline using Flink as runner, we've enabled
checkpointing and added a field (with a default value) to a Java model
class called "internal.boot.hbase.HBaseRow". Then, after restarting the
pipeline with that change, following error was thrown:

Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build
(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperator
StateBackend(FsStateBackend.java:544)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.lang.IllegalStateException: Could not Java-deserialize
TypeSerializer while restoring checkpoint metadata for serializer
snapshot
'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$Le
gacySnapshot'. Please update to the TypeSerializerSnapshot interface
that removes Java Serialization to avoid this problem in the future.
at
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.rest
oreSerializer(TypeSerializerConfigSnapshot.java:138)
at
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSe
rializer(StateSerializerProvider.java:189)
at
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSer
ializer(StateSerializerProvider.java:164)
at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.g
etPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:
113)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(Op
eratorStateRestoreOperation.java:94)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build
(DefaultOperatorStateBackendBuilder.java:83)
... 12 more
Caused by: java.io.InvalidClassException: internal.boot.hbase.HBaseRow;
local class incompatible: stream classdesc serialVersionUID =
3720984101010230366, local class serialVersionUID = 5344777628021899455
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at
java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286
)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:206
8)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$T
ypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.ja
va:301)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.t
ryReadSerializer(TypeSerializerSerializationUtil.java:116)
at
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.read
Snapshot(TypeSerializerConfigSnapshot.java:113)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersio
nedSnapshot(TypeSerializerSnapshot.java:174)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSeria
lizerSnapshotSerializationUtil.java:179)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnap
shotSerializationUtil.java:150)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializati
onUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.j
ava:76)
at
org.apach

Re: Running NexMark Tests

2020-05-19 Thread Sruthi Sree Kumar
PR for the update: https://github.com/apache/beam/pull/11751

Regards,
Sruthi

On Tue, May 19, 2020 at 3:51 PM Maximilian Michels  wrote:

> Looks like an accidental change to me. Running with either version, 1.9
> or 1.10 works, but this should be changed back to using the latest version.
>
> Do you mind creating a PR?
>
> Thanks,
> Max
>
> On 19.05.20 13:02, Sruthi Sree Kumar wrote:
> > On the documentation, the version of Flink runner is changed to 1.9
> > which was 1.10(latest)
> > before
> https://github.com/apache/beam/commit/1d2700818474c008eaa324ac1b5c49c9d2857298#diff-0e75160f4b09a1a300671557930589d9
> .
> >
> > Is this an accidental change or is there any particular reason for this
> > downgrade of version?
> >
> > Regards,
> > Sruthi
> >
> > On Tue, May 12, 2020 at 7:21 PM Maximilian Michels  > > wrote:
> >
> > A heads-up if anybody else sees this, we have removed the flag:
> > https://jira.apache.org/jira/browse/BEAM-9900
> >
> > Further contributions are very welcome :)
> >
> > -Max
> >
> > On 11.05.20 17:05, Sruthi Sree Kumar wrote:
> > > I have opened a PR with the documentation change.
> > > https://github.com/apache/beam/pull/11662
> > >
> > > Regards,
> > > Sruthi
> > >
> > > On 2020/04/21 20:22:17, Ismaël Mejía  > > wrote:
> > >> You need to instruct the Flink runner to shutdown the the source
> > >> otherwise it will stay waiting.
> > >> You can this by adding the extra
> > >> argument`--shutdownSourcesOnFinalWatermark=true`
> > >> And if that works and you want to open a PR to update our
> > >> documentation that would be greatly appreciated.
> > >>
> > >> Regards,
> > >> Ismaël
> > >>
> > >>
> > >> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
> > >>  > > wrote:
> > >>>
> > >>> Hello,
> > >>>
> > >>> I am trying to run nexmark queries using flink runner streaming.
> > Followed the documentation and used the command
> > >>> ./gradlew :sdks:java:testing:nexmark:run \
> > >>>
> > >>> -Pnexmark.runner=":runners:flink:1.10" \
> > >>> -Pnexmark.args="
> > >>> --runner=FlinkRunner
> > >>> --suite=SMOKE
> > >>> --streamTimeout=60
> > >>> --streaming=true
> > >>> --manageResources=false
> > >>> --monitorJobs=true
> > >>> --flinkMaster=[local]"
> > >>>
> > >>>
> > >>> But after the events are read from the source, there is no
> > further progress and the job is always stuck at 99%. Is there any
> > configuration that I am missing?
> > >>>
> > >>> Regards,
> > >>> Sruthi
> > >>
> >
>


Re: How Beam coders match with runner serialization

2020-05-19 Thread Maximilian Michels
Hi Ivan,

Beam does not use Java serialization for checkpoint data. It uses Beam
coders which are wrapped in Flink's TypeSerializers. That said, Beam
does not support serializer migration yet.

I'm curious, what do you consider a "backwards-compatible" change? If
you are attempting to upgrade the snapshot from one Beam version to
another, there is a high change this won't work due to coder changes
introduced between two Beam releases.

Best,
Max

PS:
This blog post might shed some more light on the matter:
https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html

On 19.05.20 13:45, Ivan San Jose wrote:
> Hi, I've been started to use Apache Beam not so long ago (so bear with
> me please) and I have a question about Beam coders and how are they
> related with under the hood runner serializer... Let me explain myself
> better:
> 
> As far I've understood from Beam documentation, coders are being used
> in order to describe how PCollection elements are going to be
> serialized/deserialized when needed. Is that correct?
> 
> In the other hand, we have the runner, which is executing the
> application under the hood, and needs to send elements among different
> workers, drivers, whatever other thing... And it seems is using its
> proper serializer, not Beam coder defined for that type. Is that
> correct?
> 
> I'm asking this because, in our case, we are using Flink, and we are
> having issues trying to restore checkpoints because it seems to be
> using standard Java serialization.
> 
> So, first thing we would like to know is how coders are related with
> runner's serializer.. Is Beam serializing elements and then the runner
> is serializing again the result or what?
> 
> And the other question is how we could get rid of Java serializer when
> using Flink as runner, because we are getting following error when
> trying to restore a checkpoint with backward compatible changes:
> 
> Could not Java-deserialize TypeSerializer while restoring checkpoint
> metadata for serializer snapshot
> 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$Le
> gacySnapshot'. Please update to the TypeSerializerSnapshot interface
> that removes Java Serialization to avoid this problem in the future.
> 
> But we don't know how to tell Beam to tell Flink to use
> TypeSerializerSnapshot as serializer, to be honest.
> 
> Thank you. Regards
> 
> 
> Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no 
> ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto 
> el mensaje como cualquier información contenida en el mensaje. Si no es el 
> destinatario, debe borrar este correo y notificar al remitente 
> inmediatamente. Cualquier punto de vista u opinión expresada en este correo 
> electrónico son únicamente del remitente, a no ser que se indique lo 
> contrario. Todos los derechos de autor en cualquier material de este correo 
> son reservados. Todos los correos electrónicos, salientes o entrantes, pueden 
> ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos 
> exentos de toda responsabilidad ante cualquier perdida o daño que surja o 
> resulte de la recepción, uso o transmisión de este correo electrónico hasta 
> el máximo permitido por la ley.
> 
> This email and any attachment to it are confidential. Unless you are the 
> intended recipient, you may not use, copy or disclose either the message or 
> any information contained in the message. If you are not the intended 
> recipient, you should delete this email and notify the sender immediately. 
> Any views or opinions expressed in this email are those of the sender only, 
> unless otherwise stated. All copyright in any of the material in this email 
> is reserved. All emails, incoming and outgoing, may be recorded and monitored 
> for legitimate business purposes. We exclude all liability for any loss or 
> damage arising or resulting from the receipt, use or transmission of this 
> email to the fullest extent permitted by law.
> 


Re: Running NexMark Tests

2020-05-19 Thread Maximilian Michels
Looks like an accidental change to me. Running with either version, 1.9
or 1.10 works, but this should be changed back to using the latest version.

Do you mind creating a PR?

Thanks,
Max

On 19.05.20 13:02, Sruthi Sree Kumar wrote:
> On the documentation, the version of Flink runner is changed to 1.9
> which was 1.10(latest)
> before 
> https://github.com/apache/beam/commit/1d2700818474c008eaa324ac1b5c49c9d2857298#diff-0e75160f4b09a1a300671557930589d9.
> 
> Is this an accidental change or is there any particular reason for this
> downgrade of version?
> 
> Regards,
> Sruthi
> 
> On Tue, May 12, 2020 at 7:21 PM Maximilian Michels  > wrote:
> 
> A heads-up if anybody else sees this, we have removed the flag:
> https://jira.apache.org/jira/browse/BEAM-9900
> 
> Further contributions are very welcome :)
> 
> -Max
> 
> On 11.05.20 17:05, Sruthi Sree Kumar wrote:
> > I have opened a PR with the documentation change.
> > https://github.com/apache/beam/pull/11662
> >
> > Regards,
> > Sruthi
> >
> > On 2020/04/21 20:22:17, Ismaël Mejía  > wrote:
> >> You need to instruct the Flink runner to shutdown the the source
> >> otherwise it will stay waiting.
> >> You can this by adding the extra
> >> argument`--shutdownSourcesOnFinalWatermark=true`
> >> And if that works and you want to open a PR to update our
> >> documentation that would be greatly appreciated.
> >>
> >> Regards,
> >> Ismaël
> >>
> >>
> >> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
> >>  > wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am trying to run nexmark queries using flink runner streaming.
> Followed the documentation and used the command
> >>> ./gradlew :sdks:java:testing:nexmark:run \
> >>>
> >>>     -Pnexmark.runner=":runners:flink:1.10" \
> >>>     -Pnexmark.args="
> >>>         --runner=FlinkRunner
> >>>         --suite=SMOKE
> >>>         --streamTimeout=60
> >>>         --streaming=true
> >>>         --manageResources=false
> >>>         --monitorJobs=true
> >>>         --flinkMaster=[local]"
> >>>
> >>>
> >>> But after the events are read from the source, there is no
> further progress and the job is always stuck at 99%. Is there any
> configuration that I am missing?
> >>>
> >>> Regards,
> >>> Sruthi
> >>
> 


How Beam coders match with runner serialization

2020-05-19 Thread Ivan San Jose
Hi, I've been started to use Apache Beam not so long ago (so bear with
me please) and I have a question about Beam coders and how are they
related with under the hood runner serializer... Let me explain myself
better:

As far I've understood from Beam documentation, coders are being used
in order to describe how PCollection elements are going to be
serialized/deserialized when needed. Is that correct?

In the other hand, we have the runner, which is executing the
application under the hood, and needs to send elements among different
workers, drivers, whatever other thing... And it seems is using its
proper serializer, not Beam coder defined for that type. Is that
correct?

I'm asking this because, in our case, we are using Flink, and we are
having issues trying to restore checkpoints because it seems to be
using standard Java serialization.

So, first thing we would like to know is how coders are related with
runner's serializer.. Is Beam serializing elements and then the runner
is serializing again the result or what?

And the other question is how we could get rid of Java serializer when
using Flink as runner, because we are getting following error when
trying to restore a checkpoint with backward compatible changes:

Could not Java-deserialize TypeSerializer while restoring checkpoint
metadata for serializer snapshot
'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$Le
gacySnapshot'. Please update to the TypeSerializerSnapshot interface
that removes Java Serialization to avoid this problem in the future.

But we don't know how to tell Beam to tell Flink to use
TypeSerializerSnapshot as serializer, to be honest.

Thank you. Regards


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser 
que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el 
mensaje como cualquier información contenida en el mensaje. Si no es el 
destinatario, debe borrar este correo y notificar al remitente inmediatamente. 
Cualquier punto de vista u opinión expresada en este correo electrónico son 
únicamente del remitente, a no ser que se indique lo contrario. Todos los 
derechos de autor en cualquier material de este correo son reservados. Todos 
los correos electrónicos, salientes o entrantes, pueden ser grabados y 
monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda 
responsabilidad ante cualquier perdida o daño que surja o resulte de la 
recepción, uso o transmisión de este correo electrónico hasta el máximo 
permitido por la ley.

This email and any attachment to it are confidential. Unless you are the 
intended recipient, you may not use, copy or disclose either the message or any 
information contained in the message. If you are not the intended recipient, 
you should delete this email and notify the sender immediately. Any views or 
opinions expressed in this email are those of the sender only, unless otherwise 
stated. All copyright in any of the material in this email is reserved. All 
emails, incoming and outgoing, may be recorded and monitored for legitimate 
business purposes. We exclude all liability for any loss or damage arising or 
resulting from the receipt, use or transmission of this email to the fullest 
extent permitted by law.


PubsubUnboundedSink parameters on Dataflow runner

2020-05-19 Thread Marcin Kuthan
I'm looking for the Pubsub publication details on unbounded collections
when Dataflow runner is used and streaming engine is on.

As I understood correctly the PubsubUnboundedSink transform is overridden
by internal implementation.

https://lists.apache.org/thread.html/26e2bfdb6eaa7319ea3cc65f9d8a0bfeb7be6a6d88f0167ebad0591d%40%3Cuser.beam.apache.org%3E

Questions:

1. Should I expect that parameters maxBatchByteSize, batchSize are
respected, or Dataflow internal implementation just ignores them?
2. What about pubsubClientFactory? The default one is
PubsubJsonClientFactory, and this is somehow important if I want to
configure maxBatchByteSize under Pubsub 10MB limit. Json factory encodes
messages using base64, so the limit should be lowered to 10MB * 0.75 (minus
some safety margin).
3. Should I expect any differences for bounded and unbounded collections?
There are different defaults in the Beam code: e.g: maxBatchByteSize is
~7.5MB for bounded and ~400kB for unbounded collections, batchSize is 100
for bounded, and 1000 for unbounded. I also don't understand the reasons
behind default settings.
4. How to estimate streaming engine costs for internal shuffling in
PubsubUnboundedSink, if any? The default PubsubUnboundedSink implementation
shuffles data before publication but I don't know how how it is done by
internal implementation. And I don't need to know, as long as it does not
generate extra costs :)

Many questions about Dataflow internals but it would be nice to know some
details, the details important from the performance and costs perspective.

Thanks,
Marcin


Re: Running NexMark Tests

2020-05-19 Thread Sruthi Sree Kumar
On the documentation, the version of Flink runner is changed to 1.9 which
was 1.10(latest) before
https://github.com/apache/beam/commit/1d2700818474c008eaa324ac1b5c49c9d2857298#diff-0e75160f4b09a1a300671557930589d9
.

Is this an accidental change or is there any particular reason for this
downgrade of version?

Regards,
Sruthi

On Tue, May 12, 2020 at 7:21 PM Maximilian Michels  wrote:

> A heads-up if anybody else sees this, we have removed the flag:
> https://jira.apache.org/jira/browse/BEAM-9900
>
> Further contributions are very welcome :)
>
> -Max
>
> On 11.05.20 17:05, Sruthi Sree Kumar wrote:
> > I have opened a PR with the documentation change.
> > https://github.com/apache/beam/pull/11662
> >
> > Regards,
> > Sruthi
> >
> > On 2020/04/21 20:22:17, Ismaël Mejía  wrote:
> >> You need to instruct the Flink runner to shutdown the the source
> >> otherwise it will stay waiting.
> >> You can this by adding the extra
> >> argument`--shutdownSourcesOnFinalWatermark=true`
> >> And if that works and you want to open a PR to update our
> >> documentation that would be greatly appreciated.
> >>
> >> Regards,
> >> Ismaël
> >>
> >>
> >> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
> >>  wrote:
> >>>
> >>> Hello,
> >>>
> >>> I am trying to run nexmark queries using flink runner streaming.
> Followed the documentation and used the command
> >>> ./gradlew :sdks:java:testing:nexmark:run \
> >>>
> >>> -Pnexmark.runner=":runners:flink:1.10" \
> >>> -Pnexmark.args="
> >>> --runner=FlinkRunner
> >>> --suite=SMOKE
> >>> --streamTimeout=60
> >>> --streaming=true
> >>> --manageResources=false
> >>> --monitorJobs=true
> >>> --flinkMaster=[local]"
> >>>
> >>>
> >>> But after the events are read from the source, there is no further
> progress and the job is always stuck at 99%. Is there any configuration
> that I am missing?
> >>>
> >>> Regards,
> >>> Sruthi
> >>
>


Re: TextIO. Writing late files

2020-05-19 Thread Maximilian Michels
> This is still confusing to me - why would the messages be dropped as late in 
> this case?

Since you previously mentioned that the bug is due to the pane info
missing, I just pointed out that the WriteFiles logic is expected to
drop the pane info.

@Jose Would it make sense to file a JIRA and summarize all the findings
here?

@Jozef What you describe in
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html is
expected because Flink does not do a GroupByKey on Reshuffle but just
redistributes the elements.

Thanks,
Max

On 18.05.20 21:59, Jose Manuel wrote:
> Hi Reuven, 
> 
> I can try to explaining what I guess. 
> 
> - There is a source which is reading data entries and updating the
> watermark.
> - Then, data entries are grouped and stored in files. 
> - The window information of these data entries are used to emit
> filenames. Data entries's window and timestamp. PaneInfo is empty.
> - When a second window is applied to filenames, if allowlateness is zero
> of lower than the spent time in the previous reading/writing, the
> filenames are discarded as late.
> 
> I guess, the key is in 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168
> 
> My assumption is global watermark (or source watermark, I am not sure
> about the name) is used to evaluate the filenames, what are in an
> already emitted window.
> 
> Thanks
> Jose
> 
> 
> El lun., 18 may. 2020 a las 18:37, Reuven Lax ( >) escribió:
> 
> This is still confusing to me - why would the messages be dropped as
> late in this case?
> 
> On Mon, May 18, 2020 at 6:14 AM Maximilian Michels  > wrote:
> 
> All runners which use the Beam reference implementation drop the
> PaneInfo for
> WriteFilesResult#getPerDestinationOutputFilenames(). That's
> why we can observe this behavior not only in Flink but also Spark.
> 
> The WriteFilesResult is returned here:
> 
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
> 
> GatherBundlesPerWindow will discard the pane information because all
> buffered elements are emitted in the FinishBundle method which
> always
> has a NO_FIRING (unknown) pane info:
> 
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
> 
> So this seems expected behavior. We would need to preserve the
> panes in
> the Multimap buffer.
> 
> -Max
> 
> On 15.05.20 18:34, Reuven Lax wrote:
> > Lateness should never be introduced inside a pipeline -
> generally late
> > data can only come from a source.  If data was not dropped as late
> > earlier in the pipeline, it should not be dropped after the
> file write.
> > I suspect that this is a bug in how the Flink runner handles the
> > Reshuffle transform, but I'm not sure what the exact bug is.
> >
> > Reuven
> >
> > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek
> mailto:jozo.vil...@gmail.com>
> > >>
> wrote:
> >
> >     Hi Jose,
> >
> >     thank you for putting the effort to get example which
> >     demonstrate your problem. 
> >
> >     You are using a streaming pipeline and it seems that
> watermark in
> >     downstream already advanced further, so when your File
> pane arrives,
> >     it is already late. Since you define that lateness is not
> tolerated,
> >     it is dropped.
> >     I myself never had requirement to specify zero allowed
> lateness for
> >     streaming. It feels dangerous. Do you have a specific use
> case?
> >     Also, in may cases, after windowed files are written, I
> usually
> >     collect them into global window and specify a different
> triggering
> >     policy for collecting them. Both cases are why I never
> came across
> >     this situation.
> >
> >     I do not have an explanation if it is a bug or not. I
> would guess
> >     that watermark can advance further, e.g. because elements
> can be
> >     processed in arbitrary order. Not saying this is the case.
> >     It needs someone with better understanding of how
> watermark advance
> >     is / should be handled within pipelines. 
> >
> >
> >     P.S.: you can add `.withTimestampFn()` to your generate
> sequence, to
> >     get more stable timing, which is