Re: ElasticsearchIO bulk delete

2018-07-30 Thread Wout Scheepers
Hey Tim,

Thanks for your proposal to mentor me through my first PR.
As we’re definitely planning to upgrade to ES6 when Beam supports it, we 
decided to postpone the feature (we have a fix that works for us, for now).
When Beam supports ES6, I’ll be happy to make a contribution to get bulk 
deletes working.

For reference, I opened a ticket 
(https://issues.apache.org/jira/browse/BEAM-5042).

Cheers,
Wout


From: Tim Robertson 
Reply-To: "user@beam.apache.org" 
Date: Friday, 27 July 2018 at 17:43
To: "user@beam.apache.org" 
Subject: Re: ElasticsearchIO bulk delete

Hi Wout,

This is great, thank you. I wrote the partial update support you reference and 
I'll be happy to mentor you through your first PR - welcome aboard. Can you 
please open a Jira to reference this work and we'll assign it to you?

We discussed having the "_xxx" fields in the document and triggering actions 
based on that in the partial update jira but opted to avoid it. Based on that 
discussion the ActionFn would likely be the preferred approach.  Would that be 
possible?

It will be important to provide unit and integration tests as well.

Please be aware that there is a branch and work underway for ES6 already which 
is rather different on the write() path so this may become redundant rather 
quickly.

Thanks,
Tim

@timrobertson100 on the Beam slack channel



On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
wrote:
Hey all,

A while ago, I patched ElasticsearchIO to be able to do partial updates and 
deletes.
However, I did not consider my patch pull-request-worthy as the json parsing 
was done inefficient (parsed it twice per document).

Since Beam 2.5.0 partial updates are supported, so the only thing I’m missing 
is the ability to send bulk delete requests.
We’re using entity updates for event sourcing in our data lake and need to 
persist deleted entities in elastic.
We’ve been using my patch in production for the last year, but I would like to 
contribute to get the functionality we need into one of the next releases.

I’ve created a gist that works for me, but is still inefficient (parsing twice: 
once to check the ‘_action` field, once to get the metadata).
Each document I want to delete needs an additional ‘_action’ field with the 
value ‘delete’. It doesn’t matter the document still contains the redundant 
field, as the delete action only requires the metadata.
I’ve added the method isDelete() and made some changes to the processElement() 
method.
https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9

I would like to make my solution more generic to fit into the current 
ElasticsearchIO and create a proper pull request.
As this would be my first pull request for beam, can anyone point me in the 
right direction before I spent too much time creating something that will be 
rejected?

Some questions on the top of my mind are:

  *   Is it a good idea it to make the ‘action’ part for the bulk api generic?
  *   Should it be even more generic? (e.g.: set an ‘ActionFn’ on the 
ElasticsearchIO)
  *   If I want to avoid parsing twice, the parsing should be done outside of 
the getDocumentMetaData() method. Would this be acceptable?
  *   Is it possible to avoid passing the action as a field in the document?
  *   Is there another or better way to get the delete functionality in general?

All feedback is more than welcome.

Cheers,
Wout





A windows/trigger Question with kafkaIO over Spark Runner

2018-07-30 Thread linrick
Dear all

I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 1000 \
--record-size 100 \
--topic kafkasink \
--throughput 1 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==

The display of Kafka broker on console is as:
==
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
...
==

We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:
==
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");

PCollection> readData = p.apply(KafkaIO.read()
  .withBootstrapServers("ubuntu7:9092")
  .withTopic("kafkasink")
  .withKeyDeserializer(IntegerDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  //.withMaxNumRecords(50)
  .withoutMetadata());

PCollection> readData1 = readData.
  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224  3000
225  0
226  3000
227  0
228  0
236  0
237  0
238  5000

Unfortunately, results that we are looking forward to is:
224  9000
225  11000
226  9505
227  9829
228  10001

I do not know how to deal with this situation that maybe is about data latency?

1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632

If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to overcome this.

I am looking forward to hearing from you.

Sincerely yours,

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


RE: A windows/trigger Question with kafkaIO over Spark Runner

2018-07-30 Thread Nicolas Viard
Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas


De : linr...@itri.org.tw 
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 1000 \

--record-size 100 \

--topic kafkasink \

--throughput 1 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==



The display of Kafka broker on console is as:

==

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection> readData = p.apply(KafkaIO.read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(50)

  .withoutMetadata());



PCollection> readData1 = readData.

  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

.triggering(AfterWatermark.pastEndOfWindow()

  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

.withAllowedLateness(Duration.ZERO)

.discardingFiredPanes());

…

==

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632



If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


Re: ElasticsearchIO bulk delete

2018-07-30 Thread Tim Robertson
> we decided to postpone the feature

That makes sense.

I believe the ES6 branch is in-part working (I've looked at the code but
not used it) which you can see here [1] and the jira to watch or contribute
is [2]. It would be a useful addition to test independently and report any
observations or improvement requests on that jira.

The offer to assist in your first PR remains open for the future - please
don't hesitate to ask.

Thanks,
Tim

[1]
https://github.com/jsteggink/beam/tree/BEAM-3199/sdks/java/io/elasticsearch-6/src/main/java/org/apache/beam/sdk/io/elasticsearch
[2] https://issues.apache.org/jira/browse/BEAM-3199

On Mon, Jul 30, 2018 at 10:55 AM, Wout Scheepers <
wout.scheep...@vente-exclusive.com> wrote:

> Hey Tim,
>
>
>
> Thanks for your proposal to mentor me through my first PR.
>
> As we’re definitely planning to upgrade to ES6 when Beam supports it, we
> decided to postpone the feature (we have a fix that works for us, for now).
>
> When Beam supports ES6, I’ll be happy to make a contribution to get bulk
> deletes working.
>
>
>
> For reference, I opened a ticket (https://issues.apache.org/
> jira/browse/BEAM-5042).
>
>
>
> Cheers,
>
> Wout
>
>
>
>
>
> *From: *Tim Robertson 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Friday, 27 July 2018 at 17:43
> *To: *"user@beam.apache.org" 
> *Subject: *Re: ElasticsearchIO bulk delete
>
>
>
> Hi Wout,
>
>
>
> This is great, thank you. I wrote the partial update support you reference
> and I'll be happy to mentor you through your first PR - welcome aboard. Can
> you please open a Jira to reference this work and we'll assign it to you?
>
>
>
> We discussed having the "_xxx" fields in the document and triggering
> actions based on that in the partial update jira but opted to avoid
> it. Based on that discussion the ActionFn would likely be the preferred
> approach.  Would that be possible?
>
>
>
> It will be important to provide unit and integration tests as well.
>
>
>
> Please be aware that there is a branch and work underway for ES6 already
> which is rather different on the write() path so this may become redundant
> rather quickly.
>
>
>
> Thanks,
>
> Tim
>
>
>
> @timrobertson100 on the Beam slack channel
>
>
>
>
>
>
>
> On Fri, Jul 27, 2018 at 2:53 PM, Wout Scheepers  exclusive.com> wrote:
>
> Hey all,
>
>
>
> A while ago, I patched ElasticsearchIO to be able to do partial updates
> and deletes.
>
> However, I did not consider my patch pull-request-worthy as the json
> parsing was done inefficient (parsed it twice per document).
>
>
>
> Since Beam 2.5.0 partial updates are supported, so the only thing I’m
> missing is the ability to send bulk *delete* requests.
>
> We’re using entity updates for event sourcing in our data lake and need to
> persist deleted entities in elastic.
>
> We’ve been using my patch in production for the last year, but I would
> like to contribute to get the functionality we need into one of the next
> releases.
>
>
>
> I’ve created a gist that works for me, but is still inefficient (parsing
> twice: once to check the ‘_action` field, once to get the metadata).
>
> Each document I want to delete needs an additional ‘_action’ field with
> the value ‘delete’. It doesn’t matter the document still contains the
> redundant field, as the delete action only requires the metadata.
>
> I’ve added the method isDelete() and made some changes to the
> processElement() method.
>
> https://gist.github.com/wscheep/26cca4bda0145ffd38faf7efaf2c21b9
>
>
>
> I would like to make my solution more generic to fit into the current
> ElasticsearchIO and create a proper pull request.
>
> As this would be my first pull request for beam, can anyone point me in
> the right direction before I spent too much time creating something that
> will be rejected?
>
>
>
> Some questions on the top of my mind are:
>
>- Is it a good idea it to make the ‘action’ part for the bulk api
>generic?
>- Should it be even more generic? (e.g.: set an ‘ActionFn’ on the
>ElasticsearchIO)
>- If I want to avoid parsing twice, the parsing should be done outside
>of the getDocumentMetaData() method. Would this be acceptable?
>- Is it possible to avoid passing the action as a field in the
>document?
>- Is there another or better way to get the delete functionality in
>general?
>
>
>
> All feedback is more than welcome.
>
>
> Cheers,
> Wout
>
>
>
>
>
>
>


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-07-30 Thread Juan Carlos Garcia
Bump!

Does any of the core-dev roam around here?

Can someone provide a feedback about BEAM-4597


Thanks and regards,

On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia 
wrote:

> Folks,
>
> Its someone using the SparkRunner out there with the Spark KryoSerializer ?
>
> We are being force to use the not so efficient 'JavaSerializer' with Spark
> because we face the following exception:
>
> 
> Exception in thread "main" java.lang.RuntimeException: 
> org.apache.spark.SparkException:
> Job aborted due to stage failure: Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$
> Lambda$31/1875283985
> Serialization trace:
> factory (org.apache.beam.runners.core.metrics.MetricsMap)
> counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> metricsContainers (org.apache.beam.runners.core.metrics.
> MetricsContainerStepMap)
> metricsContainers (org.apache.beam.runners.spark.io.SparkUnboundedSource$
> Metadata)
> at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(
> SparkPipelineResult.java:55)
> at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(
> SparkPipelineResult.java:72)
> at org.apache.beam.runners.spark.SparkPipelineResult.access$
> 000(SparkPipelineResult.java:41)
> at org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(
> SparkPipelineResult.java:163)
> at org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(
> SparkPipelineResult.java:198)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:101)
> at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:87)
> at org.apache.beam.examples.BugWithKryoOnSpark.main(
> BugWithKryoOnSpark.java:75)
> 
>
> I created a jira ticket and attached a project example on it,
> https://issues.apache.org/jira/browse/BEAM-4597
>
> Any feedback is appreciated.
>
> --
>
> JC
>
>


-- 

JC


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-07-30 Thread Jean-Baptiste Onofré
Hi Juan,

it seems that has been introduce by the metrics layer in the core runner
API.

Let me check.

Regards
JB

On 30/07/2018 14:47, Juan Carlos Garcia wrote:
> Bump!
> 
> Does any of the core-dev roam around here?
> 
> Can someone provide a feedback about BEAM-4597
> 
> 
> Thanks and regards,
> 
> On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia  > wrote:
> 
> Folks,
> 
> Its someone using the SparkRunner out there with the Spark
> KryoSerializer ?
> 
> We are being force to use the not so efficient 'JavaSerializer' with
> Spark because we face the following exception:
> 
> 
> Exception in thread "main" java.lang.RuntimeException:
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Exception while getting task result:
> com.esotericsoftware.kryo.KryoException: Unable to find class:
> 
> org.apache.beam.runners.core.metrics.MetricsContainerImpl$$Lambda$31/1875283985
> Serialization trace:
> factory (org.apache.beam.runners.core.metrics.MetricsMap)
> counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> metricsContainers
> (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
> metricsContainers
> (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
> at
> 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
> at
> 
> org.apache.beam.examples.BugWithKryoOnSpark.main(BugWithKryoOnSpark.java:75)
> 
> 
> I created a jira ticket and attached a project example on it,
> https://issues.apache.org/jira/browse/BEAM-4597
> 
> 
> Any feedback is appreciated.
> 
> -- 
> 
> JC 
> 
> 
> 
> 
> -- 
> 
> JC 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Filtering data using external source.

2018-07-30 Thread Jose Bermeo
Hi, question guys.

I have to filter an unbounded collection based on data from a redshift DB.
I cannot use a side input as redshift data could change. One way to do it
would be to group common elements, make a query to filter each group,
finally flatten the pipe again.Do you know if this is the best way to do
it? and what would be the way to run the query agains redshift?.

Thaks.


Re: Beam SparkRunner and Spark KryoSerializer problem

2018-07-30 Thread Juan Carlos Garcia
Hi Jean,

Thanks for taking a look.


On Mon, Jul 30, 2018 at 2:49 PM, Jean-Baptiste Onofré 
wrote:

> Hi Juan,
>
> it seems that has been introduce by the metrics layer in the core runner
> API.
>
> Let me check.
>
> Regards
> JB
>
> On 30/07/2018 14:47, Juan Carlos Garcia wrote:
> > Bump!
> >
> > Does any of the core-dev roam around here?
> >
> > Can someone provide a feedback about BEAM-4597
> > 
> >
> > Thanks and regards,
> >
> > On Thu, Jul 19, 2018 at 3:41 PM, Juan Carlos Garcia  > > wrote:
> >
> > Folks,
> >
> > Its someone using the SparkRunner out there with the Spark
> > KryoSerializer ?
> >
> > We are being force to use the not so efficient 'JavaSerializer' with
> > Spark because we face the following exception:
> >
> > 
> > Exception in thread "main" java.lang.RuntimeException:
> > org.apache.spark.SparkException: Job aborted due to stage failure:
> > Exception while getting task result:
> > com.esotericsoftware.kryo.KryoException: Unable to find class:
> > org.apache.beam.runners.core.metrics.MetricsContainerImpl$$
> Lambda$31/1875283985
> > Serialization trace:
> > factory (org.apache.beam.runners.core.metrics.MetricsMap)
> > counters (org.apache.beam.runners.core.metrics.MetricsContainerImpl)
> > metricsContainers
> > (org.apache.beam.runners.core.metrics.MetricsContainerStepMap)
> > metricsContainers
> > (org.apache.beam.runners.spark.io.SparkUnboundedSource$Metadata)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.
> runtimeExceptionFrom(SparkPipelineResult.java:55)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(
> SparkPipelineResult.java:72)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.access$
> 000(SparkPipelineResult.java:41)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult$
> StreamingMode.stop(SparkPipelineResult.java:163)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(
> SparkPipelineResult.java:198)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:101)
> > at
> > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(
> SparkPipelineResult.java:87)
> > at
> > org.apache.beam.examples.BugWithKryoOnSpark.main(
> BugWithKryoOnSpark.java:75)
> > 
> >
> > I created a jira ticket and attached a project example on it,
> > https://issues.apache.org/jira/browse/BEAM-4597
> > 
> >
> > Any feedback is appreciated.
> >
> > --
> >
> > JC
> >
> >
> >
> >
> > --
> >
> > JC
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>



-- 

JC


Re: Filtering data using external source.

2018-07-30 Thread Jean-Baptiste Onofré
Hi Jose,

so basically, you create two PCollections with the same keys and then
you join/filter/flatten ?

Regards
JB

On 30/07/2018 15:09, Jose Bermeo wrote:
> Hi, question guys.
> 
> I have to filter an unbounded collection based on data from a redshift
> DB. I cannot use a side input as redshift data could change. One way to
> do it would be to group common elements, make a query to filter each
> group, finally flatten the pipe again.Do you know if this is the best
> way to do it? and what would be the way to run the query agains redshift?.
> 
> Thaks.

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Regression of (BEAM-2277) - IllegalArgumentException when using Hadoop file system for WordCount example.

2018-07-30 Thread Juan Carlos Garcia
Hi Folks,

I experienced the issued described in (BEAM-2277
), which shows it was
fixed by v2.0.0

However using version 2.4.0 and 2.6.0 (another user reported it) shows the
same error.

So either it was not 100% fixed, or the bug appeared again.

Thanks and Regards
-- 

JC


Re: Filtering data using external source.

2018-07-30 Thread Chamikara Jayalath
One solution will be to stabilize data read from redshift DB. To this end,
sending your side input through a Reshuffle transform [1] should work for
some runners. Robin is working on a more portable solution for supporting
stable input [2].

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L64
[2]
https://lists.apache.org/thread.html/f8093ad5512a7fce668550e1f9cf0921c5d1e7ff6656c7a6c9950165@%3Cdev.beam.apache.org%3E


On Mon, Jul 30, 2018 at 6:16 AM Jean-Baptiste Onofré 
wrote:

> Hi Jose,
>
> so basically, you create two PCollections with the same keys and then
> you join/filter/flatten ?
>
> Regards
> JB
>
> On 30/07/2018 15:09, Jose Bermeo wrote:
> > Hi, question guys.
> >
> > I have to filter an unbounded collection based on data from a redshift
> > DB. I cannot use a side input as redshift data could change. One way to
> > do it would be to group common elements, make a query to filter each
> > group, finally flatten the pipe again.Do you know if this is the best
> > way to do it? and what would be the way to run the query agains
> redshift?.
> >
> > Thaks.
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Filtering data using external source.

2018-07-30 Thread Jose Bermeo
Hi JB.

I'm not sure, I could create two PCollections, the question is how do I
make the PCollection from Redshift reflect the changes in the table? To
refrease my initial question, each element in my PCollection has a
foreing_key_id, I need to check if the row associated with the foreing_key_id
in redshift is valid? issue is that my PCollection is unbound (new elements
with different  foreing_key_id can show) and redshift table is also
changing.

Regards

On Mon, 30 Jul 2018 at 08:16, Jean-Baptiste Onofré  wrote:

> Hi Jose,
>
> so basically, you create two PCollections with the same keys and then
> you join/filter/flatten ?
>
> Regards
> JB
>
> On 30/07/2018 15:09, Jose Bermeo wrote:
> > Hi, question guys.
> >
> > I have to filter an unbounded collection based on data from a redshift
> > DB. I cannot use a side input as redshift data could change. One way to
> > do it would be to group common elements, make a query to filter each
> > group, finally flatten the pipe again.Do you know if this is the best
> > way to do it? and what would be the way to run the query agains
> redshift?.
> >
> > Thaks.
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Live coding & reviewing adventures

2018-07-30 Thread Holden Karau
So small schedule changes.
I’ll be doing some poking at the Go SDK at 2pm today -
https://www.youtube.com/watch?v=9UAu1DOZJhM and the one with Gris setting
up Beam on a new machine will be moved to Friday because her laptop got
delayed - https://www.youtube.com/watch?v=x8Wg7qCDA5k

On Tue, Jul 24, 2018 at 8:41 PM Holden Karau  wrote:

> I'll be doing this again this week & next looking at a few different
> topics.
>
> Tomorrow (July 25th @ 10am pacific) Gris & I will be updating the PR from
> my last live stream (adding Python dependency handling) -
> https://www.twitch.tv/events/P92irbgYR9Sx6nMQ-lGY3g /
> https://www.youtube.com/watch?v=4xDsY5QL2zM
>
> In the afternoon @ 3 pm pacific I'll be looking at the dev tools we've had
> some discussions around with respect to reviews - https://www.twitch.tv/
> events/vNzcZ7DdSuGFNYURW_9WEQ / https://www.youtube.com/
> watch?v=6cTmC_fP9B0
>
> Next week on Thursday August 1st @ 2pm pacific Gris & I will be setting up
> Beam on her new laptop together, so for any new users looking to see how to
> install Beam from source this one is for you (or for devs looking to see
> how painful set up is) - https://www.twitch.tv/
> events/YAYvNp3tT0COkcpNBxnp6A / https://www.youtube.com/watch?
> v=x8Wg7qCDA5k
>
> P.S.
>
> As always I'll be doing my regular Friday code reviews in Spark -
> https://www.youtube.com/watch?v=O4rRx-3PTiM . You can see the other ones
> I have planned on my twitch  events
>  and youtube
> .
>
> On Fri, Jul 13, 2018 at 11:54 AM, Holden Karau 
> wrote:
>
>> Hi folks! I've been doing some live coding in my other projects and I
>> figured I'd do some with Apache Beam as well.
>>
>> Today @ 3pm pacific I'm going be doing some impromptu exploration better
>> review tooling possibilities (looking at forking spark-pr-dashboard for
>> other projects like beam and setting up mentionbot to work with ASF infra)
>> - https://www.youtube.com/watch?v=ff8_jbzC8JI
>>
>> Next week (Thursday the 19th at 2pm pacific) I'm going to be working on
>> trying to get easier dependency management for the Python portable runner
>> in place - https://www.youtube.com/watch?v=Sv0XhS2pYqA
>>
>> If your interested in seeing more of the development process I hope you
>> will join me :)
>>
>> P.S.
>>
>> You can also follow on twitch which does a better job of notifications
>> https://www.twitch.tv/holdenkarau
>>
>> Also one of the other thing I do is "live reviews" of PRs but they are
>> generally opt-in and I don't have enough opt-ins from the Beam community to
>> do live reviews in Beam, if you work on Beam and would be OK with me doing
>> a live streamed review of your PRs let me know (if your curious to what
>> they look like you can see some of them here in Spark land
>> 
>> ).
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


delayed emit (timer) in py-beam?

2018-07-30 Thread Austin Bennett
What's going on with timers and python?

Am looking at building a pipeline (assuming another group in my company
will grant access to the Kafka topic):

Kafka -> beam -> have beam wait 24 hours -> do transform(s) and emit a
record.  If I read things correctly that's not currently possible in python
on beam.  What all is needed?  (trying to figure out whether that is
something that I am capable of and there is room for me to help with).
Looking for similar functionality to https://www.rabbitmq.com/blog/
2015/04/16/scheduling-messages-with-rabbitmq/ (though don't need alternate
routing, nor is that example in python).


For example, I see:  https://beam.apache.org/blog/
2017/08/28/timely-processing.html

and tickets like:  https://issues.apache.org/jira/browse/BEAM-4594


Re: delayed emit (timer) in py-beam?

2018-07-30 Thread Charles Chen
Hey Austin,

This API is not yet implemented in the Python SDK.  I am working on this
feature:  the next step from my end is to finish a reference implementation
in the local DirectRunner.  As you note, the doc at
https://s.apache.org/beam-python-user-state-and-timers describes the design.

You can track progress on the mailing list thread here:
https://lists.apache.org/thread.html/51ba1a00027ad8635bc1d2c0df805ce873995170c75d6a08dfe21997@%3Cdev.beam.apache.org%3E

Best,
Charles

On Mon, Jul 30, 2018 at 3:34 PM Austin Bennett 
wrote:

> What's going on with timers and python?
>
> Am looking at building a pipeline (assuming another group in my company
> will grant access to the Kafka topic):
>
> Kafka -> beam -> have beam wait 24 hours -> do transform(s) and emit a
> record.  If I read things correctly that's not currently possible in python
> on beam.  What all is needed?  (trying to figure out whether that is
> something that I am capable of and there is room for me to help with).
> Looking for similar functionality to
> https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
> (though don't need alternate routing, nor is that example in python).
>
>
> For example, I see:
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> and tickets like:  https://issues.apache.org/jira/browse/BEAM-4594
>
>
>


Re: delayed emit (timer) in py-beam?

2018-07-30 Thread Austin Bennett
Fantastic; thanks, Charles!



On Mon, Jul 30, 2018 at 3:49 PM, Charles Chen  wrote:

> Hey Austin,
>
> This API is not yet implemented in the Python SDK.  I am working on this
> feature:  the next step from my end is to finish a reference implementation
> in the local DirectRunner.  As you note, the doc at
> https://s.apache.org/beam-python-user-state-and-timers describes the
> design.
>
> You can track progress on the mailing list thread here:
> https://lists.apache.org/thread.html/51ba1a00027ad8635bc1d2c0df805c
> e873995170c75d6a08dfe21997@%3Cdev.beam.apache.org%3E
>
> Best,
> Charles
>
> On Mon, Jul 30, 2018 at 3:34 PM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> What's going on with timers and python?
>>
>> Am looking at building a pipeline (assuming another group in my company
>> will grant access to the Kafka topic):
>>
>> Kafka -> beam -> have beam wait 24 hours -> do transform(s) and emit a
>> record.  If I read things correctly that's not currently possible in python
>> on beam.  What all is needed?  (trying to figure out whether that is
>> something that I am capable of and there is room for me to help with).
>> Looking for similar functionality to https://www.rabbitmq.com/blog/
>> 2015/04/16/scheduling-messages-with-rabbitmq/ (though don't need
>> alternate routing, nor is that example in python).
>>
>>
>> For example, I see:  https://beam.apache.org/blog/
>> 2017/08/28/timely-processing.html
>>
>> and tickets like:  https://issues.apache.org/jira/browse/BEAM-4594
>>
>>
>>


RE: A windows/trigger Question with kafkaIO over Spark Runner

2018-07-30 Thread linrick
Dear Nicolas,

Yes, I have set this configure, as

Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection> readData1 = readData.
apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))
  .triggering(AfterWatermark.pastEndOfWindow()

.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
  .withAllowedLateness(Duration.ZERO)
  .discardingFiredPanes());

However, the result will show in the following.
“1158  1000
11590
11600
11610
11620
11630
11641000
11650
11660
11670
11680
11690
11700
….”

Rick

From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas


De : linr...@itri.org.tw 
mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 1000 \

--record-size 100 \

--topic kafkasink \

--throughput 1 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==



The display of Kafka broker on console is as:

==

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection> readData = p.apply(KafkaIO.read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(50)

  .withoutMetadata());



PCollection> readData1 = readData.

  apply(Window.>into(FixedWindows.of(Duration.standardSeconds(1)))

.triggering(AfterWatermark.pastEndOfWindow()

  
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

.withAllowedLateness(Duration.ZERO)

.discardingFiredPanes());

…

==

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632



If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This emai