Scio 0.5.7 released

2018-08-15 Thread Claire McGinty
Hi all,

We just released Scio 0.5.7. This release uses the latest Beam 2.6.0.

https://github.com/spotify/scio/releases/tag/v0.5.7

*"Panthera pardus"*
Features

   - Bump Beam to 2.6.0 (#1283 )
   - Bump Chill to 0.9.3, remove patched BigDecimalSerializer #1301
   
   - Better error messages & help for malformed PipelineOptions #1297
   
   - Add example jobs with refreshing side input (#1295
   ), FileIO#writeDynamic (#1278
   ), and stateful DoFns (#1281
   )
   - Add benchmarks for com.spotify.scio.util.Functions (#1291
   ) and force shuffling in
   other SCollection transform benchmarks (#1290
   )
   - Benchmark Combine operations (#1291
   )
   - Use Beam's ReleaseInfo util (#1276
   ) to verify runner SDK
   - Reduce copies in CombineFn (#1268
   )

Bug fixes

   - Infer correct Tensor shape for scalar features #1260
   
   - Add method to read TensorFlow examples directly from TFRecord files
   #1298 
   - Fix Kryo's Iterable serialization (#1289
   )
   - Fix REPL hanging on closeAndCollect for scala 2.12 (#1277
   )
   - Fix conflicting transitive dependencies (#1265
   )
   - ScioTest dependency restricted to test scope in sbt (#1274
   )


Re: Retry on failures when using KafkaIO and Google Dataflow

2018-08-15 Thread Raghu Angadi
It is due to "enable.autocommit=true".  Auto commit is an option to Kafka
client and how and when it commits is totally out of control of Beam &
KafkaIO.
Could you try setting commitOffsetsInFinalize()[1] in KafkaIO rather than
'enable.autocommit'? That would ensure exactly once processing.

That said, you might be interested in understanding why your example
failed:
enable.autocommit is not such a bad option by itself, but there is a corner
case where it can cause issues like this.
When a reader is initialized, it's start offset is determined in this
(specifically in Dataflow, but roughly accurate on other runners too):

   - (a) If there is a  checkpoint for the reader split  (true for all
   reads except for very first read bundle read by the split from Kafka), the
   offset comes from checkpoint. This is how exactly once is ensures. Here the
   offset commit by Kafka client with 'autocommit' does not matter.
   - (b) If there is no checkpoint, (i.e. for the first bundle of records)
   KafkaIO does not set any offset explicitly and lets Kafka client decide.
   That implies it depend on your ConsumrConfig. So ConsumerConfig decides the
   offset when a pipeline first starts.

In your example, when there was an exception for Message 25, it was still
processing the first bundle of records and there was no Dataflow
checkpoint. It kept hitting (b). Kafka's 'autocommit' is out of bounds, and
it might have committed offset 60 in one of the reties. The next retry
incorrectly reads from 60.

I hope this helps. Enabling autocommit in only useful when you want to
restart your pipeline from scratch (rather than 'updating' you Dataflow
pipeline) and still want to *roughly* resume from where the previous
pipeline left off. Even there, commitOffsetsInFinalize() is better. In
either case, exactly once processing is not guaranteed when a pipeline
restart, only way currently to achieve that is to 'update' the pipeline.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L623

Raghu.

On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
leonardo.mig...@arquivei.com.br> wrote:

> Hello,
>
> I'm using the KafkaIO source in my Beam pipeline, testing the scenario
> where intermitent errors may happen (e.g. DB connection failure) with the
> Dataflow runner.
> So I produced a sequence of 200 messages containing sequential numbers
> (1-200) to a topic, and then executed the following pipeline:
>
> p.apply("Read from kafka", KafkaIO.read()
> .withBootstrapServers(server)
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .updateConsumerProperties(properties)
> .withoutMetadata())
> .apply(Values.create())
> .apply(ParDo.of(new StepTest()));
>
> Where StepTest is defined as follows:
>
> public class StepTest extends DoFn {
> @ProcessElement
> public void processElement(ProcessContext pc) {
> String element = pc.element();
>
> if (randomErrorOccurs()) {
> throw new RuntimeException("Failed ... " + element);
> } else {
> LOG.info(element);
> }
> }
> }
>
> The consumer configuration has "enable.auto.commit=true".
> I would expect that all the numbers get printed, and if an exception is
> thrown, Dataflow's runner would retry processing that failed message until
> it eventually works.
> However, what happened in my pipeline was different: when errors start
> happening due to my code, it caused some messages to be never processed,
> and some were actually lost forever.
>
> I would expect something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
> {Dataflow retries 25}
> Failed ... 25
> {...}
> and so on... (exception would never cease to happen in this case and
> Dataflow would retry forever)
>
> My output was something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
>
> Message #25 never gets reprocessed, and all the messages up to 60 are
> lost, probably the ones in the same processing bundle as 25. Even more
> curious is that this behaviour doesn't happen when using the PubSubIO
> source, which produces the first mentioned output.
>
> My questions are:
> What is a good way of handling errors with Kafka source if I want all
> messages to be processed exactly once?
> Is there any Kafka or Dataflow configuration that I may be missing?
> Please let me know of your thoughts.
>
> Andre (cc) is part of our team and will be together in this discussion.
>
> --
> []s
>
> Leonardo Alves Miguel
> Data Engineer
> (16) 3509- | www.arquivei.com.br
> 
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> 

Re: Retry on failures when using KafkaIO and Google Dataflow

2018-08-15 Thread Lukasz Cwik
This isn't a long term solution but inserting a Reshuffle inbetween your IO
and your transforms will ensure that failures in your transforms will be
decoupled from the IO.

The IO should be responsible for correctly pushing back any messages that
would be considered uncommitted in case of a failure. I don't have enough
familiarity with the KafkaIO implementation to provide further guidance
though.

On Wed, Aug 15, 2018 at 10:14 AM Leonardo Miguel <
leonardo.mig...@arquivei.com.br> wrote:

> Hello,
>
> I'm using the KafkaIO source in my Beam pipeline, testing the scenario
> where intermitent errors may happen (e.g. DB connection failure) with the
> Dataflow runner.
> So I produced a sequence of 200 messages containing sequential numbers
> (1-200) to a topic, and then executed the following pipeline:
>
> p.apply("Read from kafka", KafkaIO.read()
> .withBootstrapServers(server)
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .updateConsumerProperties(properties)
> .withoutMetadata())
> .apply(Values.create())
> .apply(ParDo.of(new StepTest()));
>
> Where StepTest is defined as follows:
>
> public class StepTest extends DoFn {
> @ProcessElement
> public void processElement(ProcessContext pc) {
> String element = pc.element();
>
> if (randomErrorOccurs()) {
> throw new RuntimeException("Failed ... " + element);
> } else {
> LOG.info(element);
> }
> }
> }
>
> The consumer configuration has "enable.auto.commit=true".
> I would expect that all the numbers get printed, and if an exception is
> thrown, Dataflow's runner would retry processing that failed message until
> it eventually works.
> However, what happened in my pipeline was different: when errors start
> happening due to my code, it caused some messages to be never processed,
> and some were actually lost forever.
>
> I would expect something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
> {Dataflow retries 25}
> Failed ... 25
> {...}
> and so on... (exception would never cease to happen in this case and
> Dataflow would retry forever)
>
> My output was something like:
>
> {...}
> 22
> 23
> 24
> Failed ... 25
> {A new reader starts}
> Reader-0: first record offset 60
> 61
> 62
> {...}
>
> Message #25 never gets reprocessed, and all the messages up to 60 are
> lost, probably the ones in the same processing bundle as 25. Even more
> curious is that this behaviour doesn't happen when using the PubSubIO
> source, which produces the first mentioned output.
>
> My questions are:
> What is a good way of handling errors with Kafka source if I want all
> messages to be processed exactly once?
> Is there any Kafka or Dataflow configuration that I may be missing?
> Please let me know of your thoughts.
>
> Andre (cc) is part of our team and will be together in this discussion.
>
> --
> []s
>
> Leonardo Alves Miguel
> Data Engineer
> (16) 3509- | www.arquivei.com.br
> 
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> 
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> 
> 
> 
> 
>


Retry on failures when using KafkaIO and Google Dataflow

2018-08-15 Thread Leonardo Miguel
Hello,

I'm using the KafkaIO source in my Beam pipeline, testing the scenario
where intermitent errors may happen (e.g. DB connection failure) with the
Dataflow runner.
So I produced a sequence of 200 messages containing sequential numbers
(1-200) to a topic, and then executed the following pipeline:

p.apply("Read from kafka", KafkaIO.read()
.withBootstrapServers(server)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(properties)
.withoutMetadata())
.apply(Values.create())
.apply(ParDo.of(new StepTest()));

Where StepTest is defined as follows:

public class StepTest extends DoFn {
@ProcessElement
public void processElement(ProcessContext pc) {
String element = pc.element();

if (randomErrorOccurs()) {
throw new RuntimeException("Failed ... " + element);
} else {
LOG.info(element);
}
}
}

The consumer configuration has "enable.auto.commit=true".
I would expect that all the numbers get printed, and if an exception is
thrown, Dataflow's runner would retry processing that failed message until
it eventually works.
However, what happened in my pipeline was different: when errors start
happening due to my code, it caused some messages to be never processed,
and some were actually lost forever.

I would expect something like:

{...}
22
23
24
Failed ... 25
{A new reader starts}
Reader-0: first record offset 60
61
62
{...}
{Dataflow retries 25}
Failed ... 25
{...}
and so on... (exception would never cease to happen in this case and
Dataflow would retry forever)

My output was something like:

{...}
22
23
24
Failed ... 25
{A new reader starts}
Reader-0: first record offset 60
61
62
{...}

Message #25 never gets reprocessed, and all the messages up to 60 are lost,
probably the ones in the same processing bundle as 25. Even more curious is
that this behaviour doesn't happen when using the PubSubIO source, which
produces the first mentioned output.

My questions are:
What is a good way of handling errors with Kafka source if I want all
messages to be processed exactly once?
Is there any Kafka or Dataflow configuration that I may be missing?
Please let me know of your thoughts.

Andre (cc) is part of our team and will be together in this discussion.

-- 
[]s

Leonardo Alves Miguel
Data Engineer
(16) 3509- | www.arquivei.com.br

[image: Arquivei.com.br – Inteligência em Notas Fiscais]

[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]






Re: Python-Beam input from Port

2018-08-15 Thread Juan Carlos Garcia
In the way i see it a SocketIIO.read() implementation should just emit
String (or byte[]) back. So creating a PTransform that open the port and
then on the expand() method create a loop to just read the String (or
byte[]) and then emit the values into the pipeline (returning a
PColletion).

Hope it gives you an idea.

On Wed, Aug 15, 2018 at 12:55 AM, Akshay Balwally 
wrote:

> Anyone know how Beam can take input from a port (or some other testable
> timed input?)
> https://stackoverflow.com/questions/51850595/apache-beam-input-from-ports
>
> --
> Akshay Balwally
> Software Engineer
> 9372716469 |
>
> 
>



-- 

JC