Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Lukasz Cwik
If we don't hear much from users, I would be for merging the change as long
as it is marked @Experimental until we get future feedback on its usage.

On Wed, Sep 19, 2018 at 2:19 PM Jeff Klukas  wrote:

> Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
> was hoping to get context on, since I don't yet have extensive experience
> with beam.
>
> I have not yet run into issues where the output coder was not able to be
> inferred. I expect this may be a non-issue, as the individual transforms
> used within a user-provided lambda expression would presumably expose the
> ability to specify a coder.
>
> I don't have enough context yet to comment on whether display data might
> be an issue, so I do hope the user list can provide input there.
>
> On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik  wrote:
>
>> Thanks for the proposal and it does seem to make the API cleaner to build
>> anonymous composite transforms.
>>
>> In your experience have you had issues where the API doesn't work out
>> well because the PTransform:
>> * is not able to override how the output coder is inferred?
>> * can't supply display data?
>>
>> +user@beam.apache.org , do users think that the
>> provided API would be useful enough for it to be added to the core SDK or
>> would the addition of the method provide noise/detract from the existing
>> API?
>>
>> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:
>>
>>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on
>>> this suggestion and make it more concrete:
>>>
>>> https://issues.apache.org/jira/browse/BEAM-5413
>>> https://github.com/apache/beam/pull/6414
>>>
>>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>>>
 Hello all, I'm a data engineer at Mozilla working on a first project
 using Beam. I've been impressed with the usability of the API as there are
 good built-in solutions for handling many simple transformation cases with
 minimal code, and wanted to discuss one bit of ergonomics that seems to be
 missing.

 It appears that none of the existing PTransform factories are generic
 enough to take in or output a PCollectionTuple, but we've found many use
 cases where it's convenient to apply a few transforms on a PCollectionTuple
 in a lambda expression.

 For example, we've defined several PTransforms that return main and
 error output stream bundled in a PCollectionTuple. We defined a
 CompositeTransform interface so that we could handle the error output in a
 lambda expression like:

 pipeline
 .apply("attempt to deserialize messages", new
 MyDeserializationTransform())
 .apply("write deserialization errors",
 CompositeTransform.of((PCollectionTuple input) -> {
 input.get(errorTag).apply(new MyErrorOutputTransform())
 return input.get(mainTag);
 })
 .apply("more processing on the deserialized messages", new
 MyOtherTransform())

 I'd be interested in contributing a patch to add this functionality,
 perhaps as a static method PTransform.compose(). Would that patch be
 welcome? Are there other thoughts on naming?

 The full code of the CompositeTransform interface we're currently using
 is included below.


 public interface CompositeTransform>>> extends POutput> {
   OutputT expand(InputT input);

   /**
* The public factory method that serves as the entrypoint for users
 to create a composite PTransform.
*/
   static 
 PTransform of(CompositeTransform>>> OutputT> transform) {
 return new PTransform() {
   @Override
   public OutputT expand(InputT input) {
 return transform.expand(input);
   }
 };
   }
 }






Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Jeff Klukas
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
was hoping to get context on, since I don't yet have extensive experience
with beam.

I have not yet run into issues where the output coder was not able to be
inferred. I expect this may be a non-issue, as the individual transforms
used within a user-provided lambda expression would presumably expose the
ability to specify a coder.

I don't have enough context yet to comment on whether display data might be
an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik  wrote:

> Thanks for the proposal and it does seem to make the API cleaner to build
> anonymous composite transforms.
>
> In your experience have you had issues where the API doesn't work out well
> because the PTransform:
> * is not able to override how the output coder is inferred?
> * can't supply display data?
>
> +user@beam.apache.org , do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:
>
>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
>> suggestion and make it more concrete:
>>
>> https://issues.apache.org/jira/browse/BEAM-5413
>> https://github.com/apache/beam/pull/6414
>>
>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>>
>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>> using Beam. I've been impressed with the usability of the API as there are
>>> good built-in solutions for handling many simple transformation cases with
>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>> missing.
>>>
>>> It appears that none of the existing PTransform factories are generic
>>> enough to take in or output a PCollectionTuple, but we've found many use
>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>> in a lambda expression.
>>>
>>> For example, we've defined several PTransforms that return main and
>>> error output stream bundled in a PCollectionTuple. We defined a
>>> CompositeTransform interface so that we could handle the error output in a
>>> lambda expression like:
>>>
>>> pipeline
>>> .apply("attempt to deserialize messages", new
>>> MyDeserializationTransform())
>>> .apply("write deserialization errors",
>>> CompositeTransform.of((PCollectionTuple input) -> {
>>> input.get(errorTag).apply(new MyErrorOutputTransform())
>>> return input.get(mainTag);
>>> })
>>> .apply("more processing on the deserialized messages", new
>>> MyOtherTransform())
>>>
>>> I'd be interested in contributing a patch to add this functionality,
>>> perhaps as a static method PTransform.compose(). Would that patch be
>>> welcome? Are there other thoughts on naming?
>>>
>>> The full code of the CompositeTransform interface we're currently using
>>> is included below.
>>>
>>>
>>> public interface CompositeTransform>> extends POutput> {
>>>   OutputT expand(InputT input);
>>>
>>>   /**
>>>* The public factory method that serves as the entrypoint for users
>>> to create a composite PTransform.
>>>*/
>>>   static 
>>> PTransform of(CompositeTransform>> OutputT> transform) {
>>> return new PTransform() {
>>>   @Override
>>>   public OutputT expand(InputT input) {
>>> return transform.expand(input);
>>>   }
>>> };
>>>   }
>>> }
>>>
>>>
>>>
>>>


Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Lukasz Cwik
Thanks for the proposal and it does seem to make the API cleaner to build
anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well
because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+user@beam.apache.org , do users think that the
provided API would be useful enough for it to be added to the core SDK or
would the addition of the method provide noise/detract from the existing
API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:

> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
> suggestion and make it more concrete:
>
> https://issues.apache.org/jira/browse/BEAM-5413
> https://github.com/apache/beam/pull/6414
>
> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>
>> Hello all, I'm a data engineer at Mozilla working on a first project
>> using Beam. I've been impressed with the usability of the API as there are
>> good built-in solutions for handling many simple transformation cases with
>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>> missing.
>>
>> It appears that none of the existing PTransform factories are generic
>> enough to take in or output a PCollectionTuple, but we've found many use
>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>> in a lambda expression.
>>
>> For example, we've defined several PTransforms that return main and error
>> output stream bundled in a PCollectionTuple. We defined a
>> CompositeTransform interface so that we could handle the error output in a
>> lambda expression like:
>>
>> pipeline
>> .apply("attempt to deserialize messages", new
>> MyDeserializationTransform())
>> .apply("write deserialization errors",
>> CompositeTransform.of((PCollectionTuple input) -> {
>> input.get(errorTag).apply(new MyErrorOutputTransform())
>> return input.get(mainTag);
>> })
>> .apply("more processing on the deserialized messages", new
>> MyOtherTransform())
>>
>> I'd be interested in contributing a patch to add this functionality,
>> perhaps as a static method PTransform.compose(). Would that patch be
>> welcome? Are there other thoughts on naming?
>>
>> The full code of the CompositeTransform interface we're currently using
>> is included below.
>>
>>
>> public interface CompositeTransform> extends POutput> {
>>   OutputT expand(InputT input);
>>
>>   /**
>>* The public factory method that serves as the entrypoint for users to
>> create a composite PTransform.
>>*/
>>   static 
>> PTransform of(CompositeTransform> OutputT> transform) {
>> return new PTransform() {
>>   @Override
>>   public OutputT expand(InputT input) {
>> return transform.expand(input);
>>   }
>> };
>>   }
>> }
>>
>>
>>
>>


Re: KafkaIO needs access to the brokers even before the pipeline reach the worker

2018-09-19 Thread Raghu Angadi
This access is needed in order to determine number of partitions for a
topic. If you know number of partitions already, you can provide a list of
partitions manually and that avoids accessing Kafka cluster on the client.
See example usage at [1].

It is feasible to avoid this access with some deterministic assignment of
partitions on the workers for each input split. We might look into it if
'withTopicPartitions()' does not help in most of these cases.

[1]:
https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L380

On Wed, Sep 19, 2018 at 11:59 AM Juan Carlos Garcia 
wrote:

> Hi folks, we have a pipeline for Dataflow and our Google cloud environment
> has a private network (where the pipeline should run, this network
> interconnect via an IP-sec to an AWS environment where the Kafka brokers
> are running).
>
> We have found that in order to be able to submit the pipeline we have to
> do it from a machine that has access to the Kafka brokers.
>
> Is there a way to avoid that?
>
> Why KafkaIO cannot defer the communication to the brokers after the
> pipeline its on the worker node?
>
> Thanks and regards,
> JC
>


KafkaIO needs access to the brokers even before the pipeline reach the worker

2018-09-19 Thread Juan Carlos Garcia
Hi folks, we have a pipeline for Dataflow and our Google cloud environment
has a private network (where the pipeline should run, this network
interconnect via an IP-sec to an AWS environment where the Kafka brokers
are running).

We have found that in order to be able to submit the pipeline we have to do
it from a machine that has access to the Kafka brokers.

Is there a way to avoid that?

Why KafkaIO cannot defer the communication to the brokers after the
pipeline its on the worker node?

Thanks and regards,
JC


Re: Problem with KafkaIO

2018-09-19 Thread Raghu Angadi
On Wed, Sep 19, 2018 at 11:24 AM Juan Carlos Garcia 
wrote:

> Sorry I hit the send button to fast... The error occurs in the worker.
>

Np. Just one more comment on it: it is a very important design/correctness
decision to for runner to decide how to handle persistent errors in a
streaming pipeline. Dataflow keeps failing since there is no solution to
restart a pipeline from scratch without losing exactly-once guarantees. It
lets user decide if the pipeline needs to be 'upgraded'.

Raghu.

>
> Juan Carlos Garcia  schrieb am Mi., 19. Sep. 2018,
> 20:22:
>
>> Sorry for hijacking the thread, we are running Spark on top of Yarn, yarn
>> retries multiple times until it reachs it max attempt and then gives up.
>>
>> Raghu Angadi  schrieb am Mi., 19. Sep. 2018, 18:58:
>>
>>> On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
>>> wrote:
>>>
 Don't know if its related, but we have seen our pipeline dying (using
 SparkRunner) when there is problem with Kafka  (network interruptions),
 errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
 expired while fetching topic metadata

 Maybe this will fix it as well, thanks Raghu for the hint about
 *withConsumerFactoryFn.*

>>>
>>> Wouldn't that be retried by the SparkRunner if it happens on the worker?
>>> or does it happen while launching the pipeline on the client?
>>>
>>>
>>>



 On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thank you.
>
> I'm not sure though what to pass as an argument:
>
> KafkaIO.read[String,String]()
>   .withBootstrapServers(server)
>   .withTopic(topic)
>   .withKeyDeserializer(classOf[StringDeserializer])
>   .withValueDeserializer(classOf[StringDeserializer])
>   .withConsumerFactoryFn(new 
> KafkaExecutor.ConsumerFactoryFn())
>   .updateConsumerProperties(properties)
>   .commitOffsetsInFinalize()
>   .withoutMetadata()
>
>
> Regards
>
>
> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
> escreveu:
>
>> Hi Eduardo,
>>
>> There another work around you can try without having to wait for
>> 2.7.0 release: Use a wrapper to catch exception from KafkaConsumer#poll()
>> and pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>
>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>> private static class ConsumerFactoryFn
>> implements SerializableFunction,
>> Consumer> {
>>   @Override
>> public Consumer apply(Map config)
>> {
>>   return new KafkaConsumer(config) {
>>   @Override
>>   public ConsumerRecords poll(long timeout) {
>>   // work around for BEAM-5375
>>   while (true) {
>>   try {
>> return super.poll(timeout);
>>  } catch (Exception e) {
>> // LOG & sleep for sec
>> }
>>   }
>>}
>> }
>> }
>>
>> [1]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>> [2]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>
>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, we're not sure how long the network was down. According to
>>> the logs no longer than one minute. A 30 second shutdown would work for 
>>> the
>>> tests.
>>>
>>> Regards
>>>
>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>> escreveu:
>>>
 Thanks. I could repro myself as well. How long was the network
 down?

 Trying to get the fix into 2.7 RC2.

 On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Just to make myself clear, I'm not sure how to use the patch but
> if you could send us some guidance would be great.
>
> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> escreveu:
>
>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not
>> sure how we'd use the patch. We're using SBT and Spotify's Scio with 
>> Scala.
>>
>> Thanks
>>
>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi <
>> rang...@google.com> escreveu:
>>
>>> Is is feasible for you to verify the fix in your dev job? I can
>>> make a patch against Beam 2.4 branch if you like.
>>>
>>> Raghu.
>>>
>>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <

Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Sorry I hit the send button to fast... The error occurs in the worker.

Juan Carlos Garcia  schrieb am Mi., 19. Sep. 2018,
20:22:

> Sorry for hijacking the thread, we are running Spark on top of Yarn, yarn
> retries multiple times until it reachs it max attempt and then gives up.
>
> Raghu Angadi  schrieb am Mi., 19. Sep. 2018, 18:58:
>
>> On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
>> wrote:
>>
>>> Don't know if its related, but we have seen our pipeline dying (using
>>> SparkRunner) when there is problem with Kafka  (network interruptions),
>>> errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
>>> expired while fetching topic metadata
>>>
>>> Maybe this will fix it as well, thanks Raghu for the hint about
>>> *withConsumerFactoryFn.*
>>>
>>
>> Wouldn't that be retried by the SparkRunner if it happens on the worker?
>> or does it happen while launching the pipeline on the client?
>>
>>
>>
>>>
>>>
>>>
>>> On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Hi Raghu, thank you.

 I'm not sure though what to pass as an argument:

 KafkaIO.read[String,String]()
   .withBootstrapServers(server)
   .withTopic(topic)
   .withKeyDeserializer(classOf[StringDeserializer])
   .withValueDeserializer(classOf[StringDeserializer])
   .withConsumerFactoryFn(new 
 KafkaExecutor.ConsumerFactoryFn())
   .updateConsumerProperties(properties)
   .commitOffsetsInFinalize()
   .withoutMetadata()


 Regards


 Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
 escreveu:

> Hi Eduardo,
>
> There another work around you can try without having to wait for 2.7.0
> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>
> Using something like (such a wrapper is used in KafkasIO tests [2]):
> private static class ConsumerFactoryFn
> implements SerializableFunction,
> Consumer> {
>   @Override
> public Consumer apply(Map config) {
>   return new KafkaConsumer(config) {
>   @Override
>   public ConsumerRecords poll(long timeout) {
>   // work around for BEAM-5375
>   while (true) {
>   try {
> return super.poll(timeout);
>  } catch (Exception e) {
> // LOG & sleep for sec
> }
>   }
>}
> }
> }
>
> [1]:
> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
> [2]:
> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>
> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> wrote:
>
>> Hi Raghu, we're not sure how long the network was down. According to
>> the logs no longer than one minute. A 30 second shutdown would work for 
>> the
>> tests.
>>
>> Regards
>>
>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>> escreveu:
>>
>>> Thanks. I could repro myself as well. How long was the network down?
>>>
>>> Trying to get the fix into 2.7 RC2.
>>>
>>> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Just to make myself clear, I'm not sure how to use the patch but if
 you could send us some guidance would be great.

 Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> escreveu:

> Hi Raghu, yes, it is feasible, would you do that for us? I'm not
> sure how we'd use the patch. We're using SBT and Spotify's Scio with 
> Scala.
>
> Thanks
>
> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi <
> rang...@google.com> escreveu:
>
>> Is is feasible for you to verify the fix in your dev job? I can
>> make a patch against Beam 2.4 branch if you like.
>>
>> Raghu.
>>
>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, thank you very much for the pull request.
>>> We'll wait for the 2.7 Beam release.
>>>
>>> Regards!
>>>
>>> Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
>>> rang...@google.com> escreveu:
>>>
 Fix: https://github.com/apache/beam/pull/6391

 On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <
 rang...@google.com> wrote:

> Filed BEAM-5375
> 

Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Sorry for hijacking the thread, we are running Spark on top of Yarn, yarn
retries multiple times until it reachs it max attempt and then gives up.

Raghu Angadi  schrieb am Mi., 19. Sep. 2018, 18:58:

> On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
> wrote:
>
>> Don't know if its related, but we have seen our pipeline dying (using
>> SparkRunner) when there is problem with Kafka  (network interruptions),
>> errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
>> expired while fetching topic metadata
>>
>> Maybe this will fix it as well, thanks Raghu for the hint about
>> *withConsumerFactoryFn.*
>>
>
> Wouldn't that be retried by the SparkRunner if it happens on the worker?
> or does it happen while launching the pipeline on the client?
>
>
>
>>
>>
>>
>> On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, thank you.
>>>
>>> I'm not sure though what to pass as an argument:
>>>
>>> KafkaIO.read[String,String]()
>>>   .withBootstrapServers(server)
>>>   .withTopic(topic)
>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>   .withConsumerFactoryFn(new 
>>> KafkaExecutor.ConsumerFactoryFn())
>>>   .updateConsumerProperties(properties)
>>>   .commitOffsetsInFinalize()
>>>   .withoutMetadata()
>>>
>>>
>>> Regards
>>>
>>>
>>> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
>>> escreveu:
>>>
 Hi Eduardo,

 There another work around you can try without having to wait for 2.7.0
 release: Use a wrapper to catch exception from KafkaConsumer#poll() and
 pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].

 Using something like (such a wrapper is used in KafkasIO tests [2]):
 private static class ConsumerFactoryFn
 implements SerializableFunction,
 Consumer> {
   @Override
 public Consumer apply(Map config) {
   return new KafkaConsumer(config) {
   @Override
   public ConsumerRecords poll(long timeout) {
   // work around for BEAM-5375
   while (true) {
   try {
 return super.poll(timeout);
  } catch (Exception e) {
 // LOG & sleep for sec
 }
   }
}
 }
 }

 [1]:
 https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
 [2]:
 https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261

 On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, we're not sure how long the network was down. According to
> the logs no longer than one minute. A 30 second shutdown would work for 
> the
> tests.
>
> Regards
>
> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
> escreveu:
>
>> Thanks. I could repro myself as well. How long was the network down?
>>
>> Trying to get the fix into 2.7 RC2.
>>
>> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Just to make myself clear, I'm not sure how to use the patch but if
>>> you could send us some guidance would be great.
>>>
>>> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> escreveu:
>>>
 Hi Raghu, yes, it is feasible, would you do that for us? I'm not
 sure how we'd use the patch. We're using SBT and Spotify's Scio with 
 Scala.

 Thanks

 Em sex, 14 de set de 2018 às 16:07, Raghu Angadi <
 rang...@google.com> escreveu:

> Is is feasible for you to verify the fix in your dev job? I can
> make a patch against Beam 2.4 branch if you like.
>
> Raghu.
>
> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> wrote:
>
>> Hi Raghu, thank you very much for the pull request.
>> We'll wait for the 2.7 Beam release.
>>
>> Regards!
>>
>> Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
>> rang...@google.com> escreveu:
>>
>>> Fix: https://github.com/apache/beam/pull/6391
>>>
>>> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
>>> wrote:
>>>
 Filed BEAM-5375
 . I will fix
 it later this week.

 On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <
 rang...@google.com> wrote:

>
>
> On Wed, Sep 12, 2018 at 12:11 PM Ra

Re: BEAM-2059 out of sync with docs?

2018-09-19 Thread Lukasz Cwik
Only attempted metrics are supported in Dataflow streaming. Committed
metrics aren't supported.

On Wed, Sep 19, 2018 at 5:14 AM Vince Gonzalez 
wrote:

> Hi,
>
> The runner compatibility matrix says the following for Metrics / Dataflow:
>
> Metrics are not yet supported at all in streaming mode, but this support
> is coming soon ([BEAM-2059]
>
> However, BEAM-2059 appears to be resolved fixed as of 2.1.0. Is the
> compatibility matrix out of date, or does resolution of BEAM-2059 not
> entirely provide metrics in streaming mode?
>
> --vince
>
> --
>
> *  •  **Vince Gonzalez*
> *  • * Customer Engineer, Big Data, Google Cloud
>
> *  •  *vincegonza...@google.com
> *  •  *212-694-3879
>
>
>
>


Re: Problem with KafkaIO

2018-09-19 Thread Raghu Angadi
On Wed, Sep 19, 2018 at 7:26 AM Juan Carlos Garcia 
wrote:

> Don't know if its related, but we have seen our pipeline dying (using
> SparkRunner) when there is problem with Kafka  (network interruptions),
> errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
> Maybe this will fix it as well, thanks Raghu for the hint about
> *withConsumerFactoryFn.*
>

Wouldn't that be retried by the SparkRunner if it happens on the worker? or
does it happen while launching the pipeline on the client?



>
>
>
> On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> wrote:
>
>> Hi Raghu, thank you.
>>
>> I'm not sure though what to pass as an argument:
>>
>> KafkaIO.read[String,String]()
>>   .withBootstrapServers(server)
>>   .withTopic(topic)
>>   .withKeyDeserializer(classOf[StringDeserializer])
>>   .withValueDeserializer(classOf[StringDeserializer])
>>   .withConsumerFactoryFn(new 
>> KafkaExecutor.ConsumerFactoryFn())
>>   .updateConsumerProperties(properties)
>>   .commitOffsetsInFinalize()
>>   .withoutMetadata()
>>
>>
>> Regards
>>
>>
>> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
>> escreveu:
>>
>>> Hi Eduardo,
>>>
>>> There another work around you can try without having to wait for 2.7.0
>>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>>
>>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>>> private static class ConsumerFactoryFn
>>> implements SerializableFunction,
>>> Consumer> {
>>>   @Override
>>> public Consumer apply(Map config) {
>>>   return new KafkaConsumer(config) {
>>>   @Override
>>>   public ConsumerRecords poll(long timeout) {
>>>   // work around for BEAM-5375
>>>   while (true) {
>>>   try {
>>> return super.poll(timeout);
>>>  } catch (Exception e) {
>>> // LOG & sleep for sec
>>> }
>>>   }
>>>}
>>> }
>>> }
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>>> [2]:
>>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>>
>>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Hi Raghu, we're not sure how long the network was down. According to
 the logs no longer than one minute. A 30 second shutdown would work for the
 tests.

 Regards

 Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
 escreveu:

> Thanks. I could repro myself as well. How long was the network down?
>
> Trying to get the fix into 2.7 RC2.
>
> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> wrote:
>
>> Just to make myself clear, I'm not sure how to use the patch but if
>> you could send us some guidance would be great.
>>
>> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> escreveu:
>>
>>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not
>>> sure how we'd use the patch. We're using SBT and Spotify's Scio with 
>>> Scala.
>>>
>>> Thanks
>>>
>>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi 
>>> escreveu:
>>>
 Is is feasible for you to verify the fix in your dev job? I can
 make a patch against Beam 2.4 branch if you like.

 Raghu.

 On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thank you very much for the pull request.
> We'll wait for the 2.7 Beam release.
>
> Regards!
>
> Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
> rang...@google.com> escreveu:
>
>> Fix: https://github.com/apache/beam/pull/6391
>>
>> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
>> wrote:
>>
>>> Filed BEAM-5375
>>> . I will fix
>>> it later this week.
>>>
>>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <
>>> rang...@google.com> wrote:
>>>


 On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <
 rang...@google.com> wrote:

> Thanks for the job id, I looked at the worker logs (following
> usual support oncall access protocol that provides temporary 
> access to
> things like logs in GCP):
>
> Root issue looks like consumerPollLoop() mentioned earlier
>>

Re: Problem with KafkaIO

2018-09-19 Thread Raghu Angadi
You would pass a SerializableFunction (with whatever args required to
create the object of this functor, if any). E.g. ConsumerFactoryFn()
defined in my pseudo code below does not need to any args. So it would be :

KafkaIO.read()
   ...
   .withConsumerFactoryFn(new ConsumerFactoryFn()).
---






On Wed, Sep 19, 2018 at 6:29 AM Eduardo Soldera <
eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thank you.
>
> I'm not sure though what to pass as an argument:
>
> KafkaIO.read[String,String]()
>   .withBootstrapServers(server)
>   .withTopic(topic)
>   .withKeyDeserializer(classOf[StringDeserializer])
>   .withValueDeserializer(classOf[StringDeserializer])
>   .withConsumerFactoryFn(new 
> KafkaExecutor.ConsumerFactoryFn())
>   .updateConsumerProperties(properties)
>   .commitOffsetsInFinalize()
>   .withoutMetadata()
>
>
> Regards
>
>
> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
> escreveu:
>
>> Hi Eduardo,
>>
>> There another work around you can try without having to wait for 2.7.0
>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>
>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>> private static class ConsumerFactoryFn
>> implements SerializableFunction,
>> Consumer> {
>>   @Override
>> public Consumer apply(Map config) {
>>   return new KafkaConsumer(config) {
>>   @Override
>>   public ConsumerRecords poll(long timeout) {
>>   // work around for BEAM-5375
>>   while (true) {
>>   try {
>> return super.poll(timeout);
>>  } catch (Exception e) {
>> // LOG & sleep for sec
>> }
>>   }
>>}
>> }
>> }
>>
>> [1]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>> [2]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>
>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, we're not sure how long the network was down. According to the
>>> logs no longer than one minute. A 30 second shutdown would work for the
>>> tests.
>>>
>>> Regards
>>>
>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>> escreveu:
>>>
 Thanks. I could repro myself as well. How long was the network down?

 Trying to get the fix into 2.7 RC2.

 On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Just to make myself clear, I'm not sure how to use the patch but if
> you could send us some guidance would be great.
>
> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> escreveu:
>
>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not sure
>> how we'd use the patch. We're using SBT and Spotify's Scio with Scala.
>>
>> Thanks
>>
>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi 
>> escreveu:
>>
>>> Is is feasible for you to verify the fix in your dev job? I can make
>>> a patch against Beam 2.4 branch if you like.
>>>
>>> Raghu.
>>>
>>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Hi Raghu, thank you very much for the pull request.
 We'll wait for the 2.7 Beam release.

 Regards!

 Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
 rang...@google.com> escreveu:

> Fix: https://github.com/apache/beam/pull/6391
>
> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
> wrote:
>
>> Filed BEAM-5375 .
>> I will fix it later this week.
>>
>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi 
>> wrote:
>>
>>>
>>>
>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <
>>> rang...@google.com> wrote:
>>>
 Thanks for the job id, I looked at the worker logs (following
 usual support oncall access protocol that provides temporary 
 access to
 things like logs in GCP):

 Root issue looks like consumerPollLoop() mentioned earlier
 needs to handle unchecked exception. In your case it is clear that 
 poll
 thread exited with a runtime exception. The reader does not check 
 for it
 and continues to wait for poll thread to enqueue messages. A fix 
 should
 result in an IOException for read from the source. The runners 
 will handle
 that a

Re: Problem with KafkaIO

2018-09-19 Thread Juan Carlos Garcia
Don't know if its related, but we have seen our pipeline dying (using
SparkRunner) when there is problem with Kafka  (network interruptions),
errors like:  org.apache.kafka.common.errors.TimeoutException: Timeout
expired while fetching topic metadata

Maybe this will fix it as well, thanks Raghu for the hint about
*withConsumerFactoryFn.*




On Wed, Sep 19, 2018 at 3:29 PM Eduardo Soldera <
eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thank you.
>
> I'm not sure though what to pass as an argument:
>
> KafkaIO.read[String,String]()
>   .withBootstrapServers(server)
>   .withTopic(topic)
>   .withKeyDeserializer(classOf[StringDeserializer])
>   .withValueDeserializer(classOf[StringDeserializer])
>   .withConsumerFactoryFn(new 
> KafkaExecutor.ConsumerFactoryFn())
>   .updateConsumerProperties(properties)
>   .commitOffsetsInFinalize()
>   .withoutMetadata()
>
>
> Regards
>
>
> Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
> escreveu:
>
>> Hi Eduardo,
>>
>> There another work around you can try without having to wait for 2.7.0
>> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
>> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>>
>> Using something like (such a wrapper is used in KafkasIO tests [2]):
>> private static class ConsumerFactoryFn
>> implements SerializableFunction,
>> Consumer> {
>>   @Override
>> public Consumer apply(Map config) {
>>   return new KafkaConsumer(config) {
>>   @Override
>>   public ConsumerRecords poll(long timeout) {
>>   // work around for BEAM-5375
>>   while (true) {
>>   try {
>> return super.poll(timeout);
>>  } catch (Exception e) {
>> // LOG & sleep for sec
>> }
>>   }
>>}
>> }
>> }
>>
>> [1]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
>> [2]:
>> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>>
>> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, we're not sure how long the network was down. According to the
>>> logs no longer than one minute. A 30 second shutdown would work for the
>>> tests.
>>>
>>> Regards
>>>
>>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>>> escreveu:
>>>
 Thanks. I could repro myself as well. How long was the network down?

 Trying to get the fix into 2.7 RC2.

 On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> wrote:

> Just to make myself clear, I'm not sure how to use the patch but if
> you could send us some guidance would be great.
>
> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> escreveu:
>
>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not sure
>> how we'd use the patch. We're using SBT and Spotify's Scio with Scala.
>>
>> Thanks
>>
>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi 
>> escreveu:
>>
>>> Is is feasible for you to verify the fix in your dev job? I can make
>>> a patch against Beam 2.4 branch if you like.
>>>
>>> Raghu.
>>>
>>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Hi Raghu, thank you very much for the pull request.
 We'll wait for the 2.7 Beam release.

 Regards!

 Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <
 rang...@google.com> escreveu:

> Fix: https://github.com/apache/beam/pull/6391
>
> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
> wrote:
>
>> Filed BEAM-5375 .
>> I will fix it later this week.
>>
>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi 
>> wrote:
>>
>>>
>>>
>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <
>>> rang...@google.com> wrote:
>>>
 Thanks for the job id, I looked at the worker logs (following
 usual support oncall access protocol that provides temporary 
 access to
 things like logs in GCP):

 Root issue looks like consumerPollLoop() mentioned earlier
 needs to handle unchecked exception. In your case it is clear that 
 poll
 thread exited with a runtime exception. The reader does not check 
 for it
 and continues to wait for poll thread to enqueue messages. A fix 
 should
 result in an IOException for read from the source. The runners 
 will h

Re: Problem with KafkaIO

2018-09-19 Thread Eduardo Soldera
Hi Raghu, thank you.

I'm not sure though what to pass as an argument:

KafkaIO.read[String,String]()
  .withBootstrapServers(server)
  .withTopic(topic)
  .withKeyDeserializer(classOf[StringDeserializer])
  .withValueDeserializer(classOf[StringDeserializer])
  .withConsumerFactoryFn(new KafkaExecutor.ConsumerFactoryFn())
  .updateConsumerProperties(properties)
  .commitOffsetsInFinalize()
  .withoutMetadata()


Regards


Em ter, 18 de set de 2018 às 21:15, Raghu Angadi 
escreveu:

> Hi Eduardo,
>
> There another work around you can try without having to wait for 2.7.0
> release: Use a wrapper to catch exception from KafkaConsumer#poll() and
> pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
>
> Using something like (such a wrapper is used in KafkasIO tests [2]):
> private static class ConsumerFactoryFn
> implements SerializableFunction,
> Consumer> {
>   @Override
> public Consumer apply(Map config) {
>   return new KafkaConsumer(config) {
>   @Override
>   public ConsumerRecords poll(long timeout) {
>   // work around for BEAM-5375
>   while (true) {
>   try {
> return super.poll(timeout);
>  } catch (Exception e) {
> // LOG & sleep for sec
> }
>   }
>}
> }
> }
>
> [1]:
> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417
> [2]:
> https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261
>
> On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera <
> eduardo.sold...@arquivei.com.br> wrote:
>
>> Hi Raghu, we're not sure how long the network was down. According to the
>> logs no longer than one minute. A 30 second shutdown would work for the
>> tests.
>>
>> Regards
>>
>> Em sex, 14 de set de 2018 às 21:41, Raghu Angadi 
>> escreveu:
>>
>>> Thanks. I could repro myself as well. How long was the network down?
>>>
>>> Trying to get the fix into 2.7 RC2.
>>>
>>> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
 Just to make myself clear, I'm not sure how to use the patch but if you
 could send us some guidance would be great.

 Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera <
 eduardo.sold...@arquivei.com.br> escreveu:

> Hi Raghu, yes, it is feasible, would you do that for us? I'm not sure
> how we'd use the patch. We're using SBT and Spotify's Scio with Scala.
>
> Thanks
>
> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi 
> escreveu:
>
>> Is is feasible for you to verify the fix in your dev job? I can make
>> a patch against Beam 2.4 branch if you like.
>>
>> Raghu.
>>
>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, thank you very much for the pull request.
>>> We'll wait for the 2.7 Beam release.
>>>
>>> Regards!
>>>
>>> Em qui, 13 de set de 2018 às 18:19, Raghu Angadi 
>>> escreveu:
>>>
 Fix: https://github.com/apache/beam/pull/6391

 On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi 
 wrote:

> Filed BEAM-5375 .
> I will fix it later this week.
>
> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi 
> wrote:
>
>>
>>
>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi 
>> wrote:
>>
>>> Thanks for the job id, I looked at the worker logs (following
>>> usual support oncall access protocol that provides temporary access 
>>> to
>>> things like logs in GCP):
>>>
>>> Root issue looks like consumerPollLoop() mentioned earlier needs
>>> to handle unchecked exception. In your case it is clear that poll 
>>> thread
>>> exited with a runtime exception. The reader does not check for it 
>>> and
>>> continues to wait for poll thread to enqueue messages. A fix should 
>>> result
>>> in an IOException for read from the source. The runners will handle 
>>> that
>>> appropriately after that.  I will file a jira.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>>
>>
>> Ignore the link.. was pasted here by mistake.
>>
>>
>>>
>>> From the logs (with a comment below each one):
>>>
>>>- 2018-09-12 06:13:07.345 PDT Reader-0: reading from
>>>kafka_topic-0 starting at offset 2
>>>   - Implies the reader is initialized and poll thread is
>>>   sta

Re: Acknowledging Pubsub messages in Flink Runner

2018-09-19 Thread Valeri Tsolov
Hey Max,
I think it is possible but not sure when we are going to plan such
activity. Will return you ASAP.

Thanks,
Valeri

На пн, 17.09.2018 г. в 19:27 ч. Maximilian Michels  написа:

> I wonder if you could do some profiling on the TaskManagers and see
> where they spend most of their time? That would be very helpful. If it
> is indeed `finalizeCheckpoint`, then we could introduce asynchronous
> acknowledgement. If it is in `snapshotState`, then we know that the
> bottleneck is there.
>
> Do you think profiling on the TaskManagers would be feasible?
>
> Another question: Did you activate asynchronous snapshots?
>
> Thanks,
> Max
>
> On 17.09.18 17:15, Encho Mishinev wrote:
> > Hi Max,
> >
> > I agree that the problem might not be in the acknowledgement itself. A
> > very long checkpoint could go past the subscription acknowledgement
> > deadline (10min is the maximum allowed) and hence the message might be
> > resent yielding the behaviour we see.
> >
> > In any way, the extreme slow down of checkpoints still remains
> > unexplained. This occurs even if the job simply reads from Pubsub and
> > does nothing else.
> >
> > We do use FsStateBackend using HDFS. The whole setup is deployed in
> > Kubernetes. Any ideas of why this might be happening would be of great
> help.
> >
> > Thanks,
> > Encho
> >
> > On Mon, Sep 17, 2018 at 4:15 PM Maximilian Michels  > > wrote:
> >
> > Hi Encho,
> >
> > Thanks for providing more insight into this. I've re-examined the
> > checkpointing code and couldn't find anything suspicious there.
> >
> >   > The first job I stopped right when it processed more messages
> than I
> >   > had loaded. The subscription afterwards had 52 000 unacknowledged
> >   > messages.
> >
> > That does sound suspicious with a parallelism of 52, but your other
> > experiments don't confirm that there is something wrong with the
> > acknowledgment. Rather, it seems the checkpointing itself is taking
> > longer and longer. This could also be caused by long acknowlegments,
> > since this stalls in-progress checkpoints.
> >
> > Please check the Web UI for statistics about the checkpoints:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/checkpoint_monitoring.html
> >
> >
> >
> > You're going through a lot of messages in between the checkpoints.
> > Which
> > state backend do you use? Please try re-running your job with the
> file
> > system state backend (FsStateBackend) or the RocksDB state backend
> > (RocksDBStateBackend). For the RocksDB state backend you will have to
> > add the RocksDB dependency. The file system backend should work out
> of
> > the box, just specify a path and set
> > FlinkPipelineOptions#setStateBackend(..). See:
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html
> >
> > Next, I could supply you with a custom Beam version which logs more
> > debug information.
> >
> > Best,
> > Max
> >
> > On 13.09.18 16:40, Encho Mishinev wrote:
> >  > Hello Max,
> >  >
> >  > I am currently performing more tests on it and will follow-up with
> >  > anything I find.
> >  >
> >  > Currently I have the following observations:
> >  >
> >  > Whenever there are few (relative to the parallelism) messages
> > left in a
> >  > pubsub topic the checkpointing length becomes very long. I have
> > tried
> >  > this with different parallelism. My usual set for testing is 13
> task
> >  > managers with 4 task slots eac, 52 parallelism for the job and
> >  > checkpointing every 60s. I've done three runs on a subscription
> > filled
> >  > with about 122,000,000 messages. The job works fast going through
> > about
> >  > 1,500,000 messages/minute until it reaches about 120,000,000 or
> > so, when
> >  > it progressively slows down. Checkpointing length increases from
> an
> >  > average of 50-60s to 2:30min-3min. When about a few hundred
> thousand
> >  > messages are left the job mostly does long checkpoints and no
> work.
> >  > Messages pass through but seemingly forever.
> >  >
> >  > The first job I stopped right when it processed more messages
> > than I had
> >  > loaded. The subscription afterwards had 52 000 unacknowledged
> > messages.
> >  >
> >  > Another job with the same approach had 87 000 unacknowledged
> > messages.
> >  >
> >  > A third job I left over 30 minutes after it had processed more
> > messages
> >  > than I had loaded. It worked very slowly with long checkpoints and
> >  > processed a few hundred thousand messages in total over the 30
> > minute
> >  > period. That subscription then had only 235 unacknowledged
> > messages left.
> >  >
> >  > I have put large acknowledgement deadline for the subscriptions
> > so that
> >  >

BEAM-2059 out of sync with docs?

2018-09-19 Thread Vince Gonzalez
Hi,

The runner compatibility matrix says the following for Metrics / Dataflow:

Metrics are not yet supported at all in streaming mode, but this support is
coming soon ([BEAM-2059]

However, BEAM-2059 appears to be resolved fixed as of 2.1.0. Is the
compatibility matrix out of date, or does resolution of BEAM-2059 not
entirely provide metrics in streaming mode?

--vince

-- 

*  •  **Vince Gonzalez*
*  • * Customer Engineer, Big Data, Google Cloud

*  •  *vincegonza...@google.com
*  •  *212-694-3879