Re: Regarding the over window query support from Beam SQL

2021-03-05 Thread Rui Wang
I see. So the problem is the alias does appear in the output schema?

Based on your log: the logical plan contains the "agg" as alias but the
physical plan (the BeamWindowRel) seems not showing the alias.

I think it's worth opening a JIRA now to further investigate why the alias
did not correctly pass through. The entry point is to investigate
from BeamWindowRel.

-Rui

On Fri, Mar 5, 2021 at 10:20 AM Tao Li  wrote:

> Hi Rui,
>
>
>
> Just following up on this issue. Do you think this is a bug? Is there a
> workaround? Thanks!
>
>
>
> *From: *Tao Li 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, March 2, 2021 at 3:37 PM
> *To: *Rui Wang 
> *Cc: *"user@beam.apache.org" 
> *Subject: *Re: Regarding the over window query support from Beam SQL
>
>
>
> Hi Rui,
>
>
>
> Thanks for this info. It’s good to know we are already supporting the
> window function. But I still have a problem with the schema of the query
> result.
>
>
>
> This is my code (with Beam 2.28):
>
>
>
> Schema appSchema = Schema
>
> .builder()
>
> .addInt32Field("foo")
>
> .addInt32Field("bar")
>
> .build();
>
>
>
> Row rowOne = Row.withSchema(appSchema).addValues(1, 1).build();
>
> Row rowTwo = Row.withSchema(appSchema).addValues(1, 2).build();
>
>
>
> PCollection inputRows = executionContext.getPipeline()
>
> .apply(Create.of(rowOne, rowTwo))
>
> .setRowSchema(appSchema);
>
>
>
> String sql = "SELECT foo, bar, RANK() over (PARTITION BY foo
> ORDER BY bar) AS agg FROM PCOLLECTION";
>
> PCollection result  = inputRows.apply("sql",
> SqlTransform.query(sql));
>
>
>
> I can see the expected data from result, but I don’t see “agg” column in
> the schema. Do you have any ideas regarding this issue? Thanks!
>
>
>
>
>
> The Beam schema of the result is:
>
>
>
> Field{name=foo, description=, type=FieldType{typeName=INT32,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=bar, description=, type=FieldType{typeName=INT32,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
> Field{name=w0$o0, description=, type=FieldType{typeName=INT64,
> nullable=false, logicalType=null, collectionElementType=null,
> mapKeyType=null, mapValueType=null, rowSchema=null, metadata={}},
> options={{}}}
>
>
>
>
>
> Here are some detailed logs if they are helpful:
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQL:
>
> SELECT `PCOLLECTION`.`foo`, `PCOLLECTION`.`bar`, RANK() OVER (PARTITION BY
> `PCOLLECTION`.`foo` ORDER BY `PCOLLECTION`.`bar`) AS `agg`
>
> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> SQLPlan>
>
> LogicalProject(foo=[$0], bar=[$1], agg=[RANK() OVER (PARTITION BY $0 ORDER
> BY $1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
> [main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner -
> BEAMPlan>
>
> BeamWindowRel(window#0=[window(partition {0} order by [1] range between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [RANK()])])
>
>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *From: *Rui Wang 
> *Date: *Tuesday, March 2, 2021 at 10:43 AM
> *To: *Tao Li 
> *Cc: *"user@beam.apache.org" 
> *Subject: *Re: Regarding the over window query support from Beam SQL
>
>
>
> Hi Tao,
>
>
>
> [1] contains what functions are working with OVER clause. Rank is one of
> the functions that is supported. Can you take a look?
>
>
>
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fextensions%2Fsql%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fextensions%2Fsql%2FBeamAnalyticFunctionsTest.java=04%7C01%7Ctaol%40zillow.com%7C18a004907e2549df27f908d8ddab09eb%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637503073974502000%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJB

Re: Regarding the over window query support from Beam SQL

2021-03-02 Thread Rui Wang
Hi Tao,

[1] contains what functions are working with OVER clause. Rank is one of
the functions that is supported. Can you take a look?


[1]:
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java

-Rui

On Tue, Mar 2, 2021 at 9:24 AM Tao Li  wrote:

> + Rui Wang. Looks like Rui has been working on this jira.
>
>
>
>
>
> *From: *Tao Li 
> *Date: *Monday, March 1, 2021 at 9:51 PM
> *To: *"user@beam.apache.org" 
> *Subject: *Regarding the over window query support from Beam SQL
>
>
>
> Hi Beam community,
>
>
>
> Querying over a window for ranking etc is pretty common in SQL use cases.
> I have found this jira https://issues.apache.org/jira/browse/BEAM-9198
>
>
>
> Do we have a plan to support this? If there is no such plan in near
> future, are Beam developers supposed to implement this function on their
> own (e.g. by using GroupBy)? Thanks!
>


Re: Apache Beam SQL and UDF

2021-02-10 Thread Rui Wang
The problem that I can think of is maybe before the async call is
completed, the UDF life cycle has reached to the end.


-Rui

On Wed, Feb 10, 2021 at 12:34 PM Talat Uyarer 
wrote:

> Hi,
>
> We plan to use UDF on our sql. We want to achieve some kind of
> filtering based on internal states. We want to update that internal state
> with a separate async thread in UDF. Before implementing that thing I want
> to get your options. Is there any limitation for UDF to have multi-thread
> implementation ?  Our UDF is a scalar function. It will get 1 or 2 input
> and return boolean.
>
> I will appreciate your comments in advance.
>
> Thanks
>


Re: Beam SQL UDF with variable arguments list?

2021-01-26 Thread Rui Wang
Yes I think Calcite does not support varargs in for scalar function (so in
UDF). Please check this JIRA:
https://issues.apache.org/jira/browse/CALCITE-2772


-Rui

On Tue, Jan 26, 2021 at 2:04 AM Niels Basjes  wrote:

> Hi,
>
> I want to define a Beam SQL user defined function that accepts a variable
> list of arguments (which may be empty).
>
> What I essentially would like to have is
>
> public class ParseUserAgentJson implements BeamSqlUdf {
>
>public static String eval( String input,
>   String... fields) { ... }
>
> }
>
> When I do this I get this on the case where the list is empty
>
> *Caused by:
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException:
> No match found for function signature ParseUserAgentJson()*
> * at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)*
> * at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)*
> * at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
> * at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)*
>
>
> So I tried
> public static String eval( @Parameter(name = "userAgent") String input,
>
>@Parameter(name = "Fields", optional = true) 
> String ... fields
> )
>
> Which gives
>
>
> *java.lang.AssertionError: No assign rules for OTHER defined*
>
> * at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeAssignmentRules.canCastFrom(SqlTypeAssignmentRules.java:389)*
> * at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeUtil.canCastFrom(SqlTypeUtil.java:864)*
> * at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlUtil.lambda$filterRoutinesByParameterType$4(SqlUtil.java:620)*
>
>
> It seems this is Calcite that is unable to do the "Variable arguments"
> trick.
>
> So right now I have this workaround which works but limits it to a maximum
> of 10 arguments:
>
> public static String eval( // NOSONAR java:S107 Methods should not have
> too many parameters
>
> @Parameter(name = "userAgent") String input,
> @Parameter(name = "Field  1", optional = true) String field1,
> @Parameter(name = "Field  2", optional = true) String field2,
> @Parameter(name = "Field  3", optional = true) String field3,
> @Parameter(name = "Field  4", optional = true) String field4,
> @Parameter(name = "Field  5", optional = true) String field5,
> @Parameter(name = "Field  6", optional = true) String field6,
> @Parameter(name = "Field  7", optional = true) String field7,
> @Parameter(name = "Field  8", optional = true) String field8,
> @Parameter(name = "Field  9", optional = true) String field9,
> @Parameter(name = "Field 10", optional = true) String field10
> ) {
>
>
> My question: Is there a better way to do this?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: [ANNOUNCE] Beam 2.25.0 Released

2020-10-26 Thread Rui Wang
Thank you Robin!


-Rui

On Mon, Oct 26, 2020 at 11:44 AM Pablo Estrada  wrote:

> Thanks Robin!
>
> On Mon, Oct 26, 2020 at 11:06 AM Robin Qiu  wrote:
>
>> The Apache Beam team is pleased to announce the release of version 2.25.0.
>>
>> Apache Beam is an open source unified programming model to define and
>> execute data processing pipelines, including ETL, batch and stream
>> (continuous) processing. See: https://beam.apache.org
>>
>> You can download the release here:
>> https://beam.apache.org/get-started/downloads/
>>
>> This release includes bug fixes, features, and improvements detailed on
>> the Beam blog: https://beam.apache.org/blog/beam-2.25.0/
>>
>> Thanks to everyone who contributed to this release, and we hope you enjoy
>> using Beam 2.25.0.
>>
>


Re: How to integrate Beam SQL windowing query with KafkaIO?

2020-08-27 Thread Rui Wang
Glad it has worked!  So sounds like data has been dropped as they are
considered late data and `.withAllowedLateness()` make the data emitted.


-Rui

On Thu, Aug 27, 2020 at 10:09 AM Minreng Wu  wrote:

> Hi Rui,
>
> Thanks for your advice!
>
> After reading Chapter 2&3 of *Streaming Systems* and some other
> materials, eventually I make it work! It indeed turned out to be an issue
> of not setting the trigger correctly. Previously, I didn't set the trigger
> & watermark so it would use the default settings. After I added
> `.withAllowedLateness()`, it can correctly materialize the window output as
> expected. Thank you so much for your help!
>
> Thanks & Regards,
> Minreng
>
>
> On Mon, Aug 24, 2020 at 1:58 PM Rui Wang  wrote:
>
>> Hi,
>>
>> I checked the query in your SO question and I think the SQL usage is
>> correct.
>>
>> My current guess is that the problem is how does watermark generate and
>> advance in KafkaIO. It could be either the watermark didn't pass the end of
>> your SQL window for aggregation or the data was lagging behind the
>> watermark so they are considered late data.
>>
>> One way to verify it is you can try to use TestStream as the source to
>> evaluate your pipeline and see whether it works well.
>>
>> -Rui
>>
>> On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu  wrote:
>>
>>> Hi contributors,
>>>
>>> Sorry to bother you! I met a problem when I was trying to apply a
>>> windowing aggregation Beam SQL query to a Kafka input source.
>>>
>>> The details of the question are in the following link:
>>> https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
>>> And the version of the Beam Java SDK I used is *2.23.0*
>>>
>>> Really appreciate your help and advice! Stay safe and happy!
>>>
>>> Thanks and regards,
>>> Minreng
>>>
>>


Re: How to integrate Beam SQL windowing query with KafkaIO?

2020-08-24 Thread Rui Wang
Hi,

I checked the query in your SO question and I think the SQL usage is
correct.

My current guess is that the problem is how does watermark generate and
advance in KafkaIO. It could be either the watermark didn't pass the end of
your SQL window for aggregation or the data was lagging behind the
watermark so they are considered late data.

One way to verify it is you can try to use TestStream as the source to
evaluate your pipeline and see whether it works well.

-Rui

On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu  wrote:

> Hi contributors,
>
> Sorry to bother you! I met a problem when I was trying to apply a
> windowing aggregation Beam SQL query to a Kafka input source.
>
> The details of the question are in the following link:
> https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
> And the version of the Beam Java SDK I used is *2.23.0*
>
> Really appreciate your help and advice! Stay safe and happy!
>
> Thanks and regards,
> Minreng
>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-23 Thread Rui Wang
Current Beam model does not guarantee an ordering after a GBK (i.e.
Combine.perKey() in your). So you cannot expect that the C step sees
elements in a specific order.

As I recall on Dataflow runner, there is very limited ordering
support. Hi +Reuven
Lax  can share your insights about it?


-Rui



On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:

> Hi,
>
> My Beam pipeline is designed to work with an unbounded source KafkaIO.
> It roughly looks like below:
> p.apply(KafkaIO.read() ...)   // (A-1)
>   .apply(WithKeys.of(...).withKeyType(...))
>   .apply(Window.into(FixedWindows.of(...)))
>   .apply(Combine.perKey(...))  // (B)
>   .apply(Window.into(new GlobalWindows())) // to have per-key stats in
> (C)
>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
> Note that (C) has its own state which is expected to be fetched and
> updated by window results (B) in order of event-time.
>
> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>
>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>
> "text.txt" contains samples having a single key.
>
> I get a wrong result and it turns out that window results didn't feed into
> (C) in order.
> Is it because (A-2) makes the pipeline a bounded one?
>
> Q1. How to prevent this from happening?
> Q2. How do you guys usually write an integration test for an unbounded one
> with stateful function?
>
> Best,
>
> Dongwon
>


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Rui Wang
Tried some code search in Beam repo but I didn't find the exact line
of code that throws your exception.

However, I believe for Java Classes you used in primitives (ParDo,
CombineFn) and coders, it's very likely you need to make them
serializable (i.e. implements Serializable).


-Rui

On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich  wrote:
>
> Hi!
> I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and 
> annotated it with DefaultCoder
>
> @DefaultCoder(AvroCoder.class)
> public class ProductCatalog {
>
> When I trying to submit it to cluster I get an error:
>
> Caused by: java.io.NotSerializableException: ...common.ProductCatalog
>
> If I add `implements Serializable` to the class definition everything works 
> fine. In the Apache Beam guide, I don't see anything about using implements 
> Serializable. What I'm doing wrong? Thank you in advance for your help


Re: [ANNOUNCE] Beam 2.20.0 Released

2020-04-16 Thread Rui Wang
Note that due to a bug on infrastructure, the website change failed to
publish. But 2.20.0 artifacts are available to use right now.



-Rui

On Thu, Apr 16, 2020 at 11:45 AM Rui Wang  wrote:

> The Apache Beam team is pleased to announce the release of version 2.20.0.
>
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See https://beam.apache.org
>
> You can download the release here:
>
> https://beam.apache.org/get-started/downloads/
>
> This release includes bug fixes, features, and improvements detailed on
> the Beam blog: https://beam.apache.org/blog/2020/04/15/beam-2.20.0.html
>
> Thanks to everyone who contributed to this release, and we hope you enjoy
> using Beam 2.20.0.
> -- Rui Wang, on behalf of The Apache Beam team
>


[ANNOUNCE] Beam 2.20.0 Released

2020-04-16 Thread Rui Wang
The Apache Beam team is pleased to announce the release of version 2.20.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed on
the Beam blog: https://beam.apache.org/blog/2020/04/15/beam-2.20.0.html

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.20.0.
-- Rui Wang, on behalf of The Apache Beam team


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
I see. So I guess I wasn't fully understand the requirement:

Do you want to have a 1-min window join on two unbounded sources and write
to sink when the window closes ? Or there is an extra requirement such that
you also want to write to sink every minute per window?

For the former, you can do it by SQL:

pipeline.apply(KafkaIO.read() ... )
.apply(Window.into(FixedWindows.of(1 minute))
.apply(SqlTransform(
  "SELECT ... FROM
(select TUMBLE_START() as window_start, * FROM
stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
  JOIN
(select TUMBLE_START() as window_start, * FROM
stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
   on table_a.window_start = table_b.window_start ...")
.apply(HCatalogIO.write() ...)

But as Kenneth mentioned HCatalogIO might not work as expected.



For the latter, the mixed Java and SQL pipeline won't help you.



-Rui

On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles  wrote:

> I think actually it depends on the pipeline. You cannot do it all in SQL,
> but if you mix Java and SQL I think you can do this. If you write this:
>
> pipeline.apply(KafkaIO.read() ... )
> .apply(Window.into(FixedWindows.of(1 minute))
> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
> .apply(HCatalogIO.write() ...)
>
> This should apply the SQL on each window. When the SQL does not do any
> windowing, it is required to be a "per-window" SQL execution. That is the
> spec for SqlTransform. If that does not work, please report your experience.
>
> But the SQL semantics do not require waiting. Today the stream-to-stream
> join will do a CoGroupByKey so it will wait. But SQL may in the future
> adopt a better join for this case that can output records with lower
> latency.
>
> It may be a bigger question whether HCatalogIO.write() has all the knobs
> you would like.
>
> Kenn
>
> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang  wrote:
>
>> SQL does not support such joins with your requirement: write to sink
>> after every 1 min after window closes.
>>
>> You might can use state and timer API to achieve your goal.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>> Sink.
>>> The pipeline I'm trying is:
>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>  And, a FixedWindow of 1 minute duration is applied.
>>>
>>> I'm expecting the inputs from unbounded sources joined within the
>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>> after each window interval.
>>>
>>> Can someone please tell if this is a valid scenario and what is the
>>> expected behaviour from this kind of scenario?
>>>
>>> Regards,
>>> Shanta
>>>
>>


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
the query above.



-Rui

On Mon, Feb 24, 2020 at 5:26 PM Rui Wang  wrote:

> I see. So I guess I wasn't fully understand the requirement:
>
> Do you want to have a 1-min window join on two unbounded sources and write
> to sink when the window closes ? Or there is an extra requirement such that
> you also want to write to sink every minute per window?
>
> For the former, you can do it by SQL:
>
> pipeline.apply(KafkaIO.read() ... )
> .apply(Window.into(FixedWindows.of(1 minute))
> .apply(SqlTransform(
>   "SELECT ... FROM
> (select TUMBLE_START() as window_start, * FROM
> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>   JOIN
> (select TUMBLE_START() as window_start, * FROM
> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>on table_a.window_start = table_b.window_start ...")
> .apply(HCatalogIO.write() ...)
>
> But as Kenneth mentioned HCatalogIO might not work as expected.
>
>
>
> For the latter, the mixed Java and SQL pipeline won't help you.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles  wrote:
>
>> I think actually it depends on the pipeline. You cannot do it all in SQL,
>> but if you mix Java and SQL I think you can do this. If you write this:
>>
>> pipeline.apply(KafkaIO.read() ... )
>> .apply(Window.into(FixedWindows.of(1 minute))
>> .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>> .apply(HCatalogIO.write() ...)
>>
>> This should apply the SQL on each window. When the SQL does not do any
>> windowing, it is required to be a "per-window" SQL execution. That is the
>> spec for SqlTransform. If that does not work, please report your experience.
>>
>> But the SQL semantics do not require waiting. Today the stream-to-stream
>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>> adopt a better join for this case that can output records with lower
>> latency.
>>
>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>> you would like.
>>
>> Kenn
>>
>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang  wrote:
>>
>>> SQL does not support such joins with your requirement: write to sink
>>> after every 1 min after window closes.
>>>
>>> You might can use state and timer API to achieve your goal.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>> shantachakp...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>>> Sink.
>>>> The pipeline I'm trying is:
>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>>
>>>> I'm expecting the inputs from unbounded sources joined within the
>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>> after each window interval.
>>>>
>>>> Can someone please tell if this is a valid scenario and what is the
>>>> expected behaviour from this kind of scenario?
>>>>
>>>> Regards,
>>>> Shanta
>>>>
>>>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-24 Thread Rui Wang
That is great. Feel free to send the patch and I can review it.

-Rui

On Mon, Feb 24, 2020, 3:54 PM Talat Uyarer 
wrote:

> Hi Rui,
>
> I solved the issue. After 1.21 version they are not getting flattened in
> LogicalPlan.
>
> Thanks for your help. I am going to create a patch for it.
>
> Talat
>
> On Sat, Feb 15, 2020 at 6:26 PM Rui Wang  wrote:
>
>> Because Calcite flattened Row so BeamSQL didn't need to deal with nested
>> Row structure (as they were flattened in LogicalPlan).
>>
>> Depends on how that patch works. Nested row might not immediately work
>> after you apply that patch.
>>
>>
>> -Rui
>>
>> On Fri, Feb 14, 2020 at 3:14 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Do you mean they were flattened before by calcite or Does beam flatten
>>> them too ?
>>>
>>>
>>>
>>> On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:
>>>
>>>> Nested row types might be less well supported (r.g. Row) because they
>>>> were flattened before anyway.
>>>>
>>>>
>>>> -Rui
>>>>
>>>> On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Thank you for your response.
>>>>> I saw it and applied patch on calcite 1.20. However I realized
>>>>> BeamCalRel does not generate right code [1]to turn back Beam types. I am
>>>>> working on that now. Please let me know if apache beam support nested row
>>>>> types but I miss it.
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_646f596988be9d6a739090f48d2fed07c8dfc17c_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamCalcRel.java-23L167=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=jP7K7YSYNuRNxEoqtYiLG5J8to51xQOEDCaQXWPyPGY=xHSFnG8v0vnb1rLc9Idq3f-21woIO5o6PI196o58n_s=>
>>>>>
>>>>> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>>>>>
>>>>>> Calcite has improved to reconstruct ROW back in the output. See [1].
>>>>>> Beam need to update Calcite dependency to > 1.21 to adopt that.
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y=>
>>>>>>
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>>>>>> records. I read them as Pcollection and apply Select * query and
>>>>>>> compare with initial rows. Looks like nested rows are flatten by 
>>>>>>> calcite.
>>>>>>> How do you have any idea how can I avoid this?
>>>>>>>
>>>>>>> I added a same testcase for my issue:
>>>>>>>
>>>>>>> Schema nestedSchema =
>>>>>>> Schema.builder()
>>>>>>> .addInt32Field("f_nestedInt")
>>>>>>> .addStringField("f_nestedString")
>>>>>>> .addInt32Field("f_nestedIntPlusOne")
>>>>>>> .build();
>>>>>>> Schema inputType =
>>>>>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>>>>>> nestedSchema).build();
>>>>>>>
>>>>>>> PCollection input =
>>>>>>> pipeline.apply(
>>>>>>> Create.of(
>>>>>>> Row.withSchema(inputType)
>>>>>>> .addValues(
>>>>>>> 1, Row.withSchema(nestedSchema).addValues(312, 
>>>>>>> "CC", 313).build())
>>>>>>> .build())
>>>>>>> .withRowSchema(inputType))
>>>>>>> .setRowSchema(inputType);
>>>>>>>
>>>>>>> PCollection result =
>>>>>>> input
>>>>>>> .apply(
>>>>>>> SqlTransform.query(
>>>>>>> "SELECT * FROM PCOLLECTION"));
>>>>>>>
>>>>>>> PAssert.that(result)
>>>>>>> .containsInAnyOrder(Row.withSchema(inputType)
>>>>>>> .addValues(
>>>>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>>>>> 313).build())
>>>>>>> .build());
>>>>>>>
>>>>>>>
>>>>>>> Thank you so much in advance.
>>>>>>>
>>>>>>>


Re: Unbounded input join Unbounded input then write to Bounded Sink

2020-02-24 Thread Rui Wang
SQL does not support such joins with your requirement: write to sink after
every 1 min after window closes.

You might can use state and timer API to achieve your goal.



-Rui

On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram 
wrote:

> Hi,
>
> I am trying to join inputs from Unbounded Sources then write to Bounded
> Sink.
> The pipeline I'm trying is:
> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>  And, a FixedWindow of 1 minute duration is applied.
>
> I'm expecting the inputs from unbounded sources joined within the current
> window to be written to the HCatalogIO Sink after every 1 min i.e after
> each window interval.
>
> Can someone please tell if this is a valid scenario and what is the
> expected behaviour from this kind of scenario?
>
> Regards,
> Shanta
>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-15 Thread Rui Wang
Because Calcite flattened Row so BeamSQL didn't need to deal with nested
Row structure (as they were flattened in LogicalPlan).

Depends on how that patch works. Nested row might not immediately work
after you apply that patch.


-Rui

On Fri, Feb 14, 2020 at 3:14 PM Talat Uyarer 
wrote:

> Do you mean they were flattened before by calcite or Does beam flatten
> them too ?
>
>
>
> On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:
>
>> Nested row types might be less well supported (r.g. Row) because they
>> were flattened before anyway.
>>
>>
>> -Rui
>>
>> On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Thank you for your response.
>>> I saw it and applied patch on calcite 1.20. However I realized
>>> BeamCalRel does not generate right code [1]to turn back Beam types. I am
>>> working on that now. Please let me know if apache beam support nested row
>>> types but I miss it.
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_646f596988be9d6a739090f48d2fed07c8dfc17c_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamCalcRel.java-23L167=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=jP7K7YSYNuRNxEoqtYiLG5J8to51xQOEDCaQXWPyPGY=xHSFnG8v0vnb1rLc9Idq3f-21woIO5o6PI196o58n_s=>
>>>
>>> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>>>
>>>> Calcite has improved to reconstruct ROW back in the output. See [1].
>>>> Beam need to update Calcite dependency to > 1.21 to adopt that.
>>>>
>>>>
>>>>
>>>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y=>
>>>>
>>>>
>>>> -Rui
>>>>
>>>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>>>> records. I read them as Pcollection and apply Select * query and
>>>>> compare with initial rows. Looks like nested rows are flatten by calcite.
>>>>> How do you have any idea how can I avoid this?
>>>>>
>>>>> I added a same testcase for my issue:
>>>>>
>>>>> Schema nestedSchema =
>>>>> Schema.builder()
>>>>> .addInt32Field("f_nestedInt")
>>>>> .addStringField("f_nestedString")
>>>>> .addInt32Field("f_nestedIntPlusOne")
>>>>> .build();
>>>>> Schema inputType =
>>>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>>>> nestedSchema).build();
>>>>>
>>>>> PCollection input =
>>>>> pipeline.apply(
>>>>> Create.of(
>>>>> Row.withSchema(inputType)
>>>>> .addValues(
>>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>>> 313).build())
>>>>> .build())
>>>>> .withRowSchema(inputType))
>>>>> .setRowSchema(inputType);
>>>>>
>>>>> PCollection result =
>>>>> input
>>>>> .apply(
>>>>> SqlTransform.query(
>>>>> "SELECT * FROM PCOLLECTION"));
>>>>>
>>>>> PAssert.that(result)
>>>>> .containsInAnyOrder(Row.withSchema(inputType)
>>>>> .addValues(
>>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>>> 313).build())
>>>>> .build());
>>>>>
>>>>>
>>>>> Thank you so much in advance.
>>>>>
>>>>>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-14 Thread Rui Wang
Nested row types might be less well supported (r.g. Row) because they were
flattened before anyway.


-Rui

On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer 
wrote:

> Thank you for your response.
> I saw it and applied patch on calcite 1.20. However I realized BeamCalRel
> does not generate right code [1]to turn back Beam types. I am working on
> that now. Please let me know if apache beam support nested row types but I
> miss it.
>
>
> [1]
> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>
> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>
>> Calcite has improved to reconstruct ROW back in the output. See [1]. Beam
>> need to update Calcite dependency to > 1.21 to adopt that.
>>
>>
>>
>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y=>
>>
>>
>> -Rui
>>
>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>> records. I read them as Pcollection and apply Select * query and
>>> compare with initial rows. Looks like nested rows are flatten by calcite.
>>> How do you have any idea how can I avoid this?
>>>
>>> I added a same testcase for my issue:
>>>
>>> Schema nestedSchema =
>>> Schema.builder()
>>> .addInt32Field("f_nestedInt")
>>> .addStringField("f_nestedString")
>>> .addInt32Field("f_nestedIntPlusOne")
>>> .build();
>>> Schema inputType =
>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>> nestedSchema).build();
>>>
>>> PCollection input =
>>> pipeline.apply(
>>> Create.of(
>>> Row.withSchema(inputType)
>>> .addValues(
>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>> 313).build())
>>> .build())
>>> .withRowSchema(inputType))
>>> .setRowSchema(inputType);
>>>
>>> PCollection result =
>>> input
>>> .apply(
>>> SqlTransform.query(
>>> "SELECT * FROM PCOLLECTION"));
>>>
>>> PAssert.that(result)
>>> .containsInAnyOrder(Row.withSchema(inputType)
>>> .addValues(
>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>> 313).build())
>>> .build());
>>>
>>>
>>> Thank you so much in advance.
>>>
>>>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-14 Thread Rui Wang
Calcite has improved to reconstruct ROW back in the output. See [1]. Beam
need to update Calcite dependency to > 1.21 to adopt that.



[1]: https://jira.apache.org/jira/browse/CALCITE-3138


-Rui

On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer 
wrote:

> Hi,
>
> I am trying to Beam SQL. But something is wrong. I have nested row
> records. I read them as Pcollection and apply Select * query and
> compare with initial rows. Looks like nested rows are flatten by calcite.
> How do you have any idea how can I avoid this?
>
> I added a same testcase for my issue:
>
> Schema nestedSchema =
> Schema.builder()
> .addInt32Field("f_nestedInt")
> .addStringField("f_nestedString")
> .addInt32Field("f_nestedIntPlusOne")
> .build();
> Schema inputType =
> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
> nestedSchema).build();
>
> PCollection input =
> pipeline.apply(
> Create.of(
> Row.withSchema(inputType)
> .addValues(
> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
> 313).build())
> .build())
> .withRowSchema(inputType))
> .setRowSchema(inputType);
>
> PCollection result =
> input
> .apply(
> SqlTransform.query(
> "SELECT * FROM PCOLLECTION"));
>
> PAssert.that(result)
> .containsInAnyOrder(Row.withSchema(inputType)
> .addValues(
> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 313).build())
> .build());
>
>
> Thank you so much in advance.
>
>


Re: [ANNOUNCE] Beam 2.18.0 Released

2020-01-28 Thread Rui Wang
Thank you Udi for taking care of Beam 2.18.0 release!



-Rui

On Tue, Jan 28, 2020 at 10:59 AM Udi Meiri  wrote:

> The Apache Beam team is pleased to announce the release of version 2.18.0.
>
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See https://beam.apache.org
>
> You can download the release here:
>
> https://beam.apache.org/get-started/downloads/
>
> This release includes bug fixes, features, and improvements detailed on
> the Beam blog: https://beam.apache.org/blog/2020/01/13/beam-2.18.0.html
>
> Thanks to everyone who contributed to this release, and we hope you enjoy
> using Beam 2.18.0.
> -- Udi Meiri, on behalf of The Apache Beam team
>


Re: apache beam 2.16.0 ?

2019-09-18 Thread Rui Wang
Hi,

You can search(or ask) in dev@ for the progress of 2.16.0.

The answer is the release of 2.16.0 is ongoing and it will be released once
blockers are solved.


-Rui

On Wed, Sep 18, 2019 at 9:34 PM Yu Watanabe  wrote:

> Hello.
>
> I would like to use 2.16.0 to diagnose container problem, however, looks
> like the job-server is not available on maven yet.
>
> RuntimeError: Unable to fetch remote job server jar at
> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.16.0/beam-runners-flink-1.8-job-server-2.16.0.jar:
> HTTP Error 404: Not Found
>
> Checked maven repo and indeed there is no job-server 2.16.0  yet.
>
> https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink
>
>
> Will 2.16.0 released soon ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon]   [image:
> Twitter icon] 
>


Re: Slowly changing lookup cache as a Table in BeamSql

2019-07-16 Thread Rui Wang
Another approach is to let BeamSQL support it natively, as the title of
this thread says: "as a Table in BeamSQL".

We might be able to define a table with properties that says this table
return a PCollectionView. By doing so we will have a trigger based
PCollectionView available in SQL rel nodes, thus SQL will be able to
implement [*Pattern: Slowly-changing lookup cache].* By this way, users
only need to construct a table and set it to SqlTransform

*. *

Create a JIRA to track this idea:
https://jira.apache.org/jira/browse/BEAM-7758


-Rui


On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni  wrote:

> Hi Rahul,
>
> FYI, that patterns is also available in the Beam docs  ( with updated code
> example )
> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>
> Please note in the DoFn that feeds the View.asSingleton() you will need to
> manually call BigQuery using the BigQuery client.
>
> Regards
>
> Reza
>
> On Tue, 16 Jul 2019 at 14:37, rahul patwari 
> wrote:
>
>> Hi,
>>
>> we are following [*Pattern: Slowly-changing lookup cache*] from
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> We have a use case to read slowly changing bounded data as a PCollection
>> along with the main PCollection from Kafka(windowed) and use it in the
>> query of BeamSql.
>>
>> Is it possible to design such a use case with Beam Java SDK?
>>
>> Approaches followed but not Successful:
>>
>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>> Transform(which applies Beam I/O on the
>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>> to PCollection Apply BeamSQL
>> Comments: Beam I/O reads data only once even though a long value is
>> generated from GenerateSequece with periodicity. The expectation is that
>> whenever a long value is generated, Beam I/O will be used to read the
>> latest data. Is this because of optimizations in the DAG? Can the
>> optimizations be overridden?
>>
>> 2) The pipeline is the same as approach 1. But, instead of using a
>> composite transform, a DoFn is used where a for loop will emit each Row of
>> the PCollection.
>> comments: The output PCollection is unbounded. But, we need a bounded
>> PCollection as this PCollection is used to JOIN with PCollection of each
>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>> PCollection inside a DoFn?
>>
>> Are there any better Approaches?
>>
>> Regards,
>> Rahul
>>
>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Re: pubsub -> IO

2019-07-15 Thread Rui Wang
+user@beam.apache.org 


-Rui

On Mon, Jul 15, 2019 at 6:55 AM Chaim Turkel  wrote:

> Hi,
>   I am looking to write a pipeline that read from a mongo collection.
>   I would like to listen to a pubsub that will have a object that will
> tell me which collection and which time frame.
>   Is there a way to do this?
>
> Chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act procedures here
> .
> Visit Legal
>  to
> review our comprehensive program terms,
> conditions, and disclosures.
>


Re: [Java] Using a complex datastructure as Key for KV

2019-07-11 Thread Rui Wang
Hi Shannon,  [1] will be a good start on coder in Java SDK.


[1]
https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety

Rui

On Thu, Jul 11, 2019 at 3:08 PM Shannon Duncan 
wrote:

> Was able to get it to use ArrayList by doing List> result =
> new ArrayList>();
>
> Then storing my keys in a separate array that I'll pass in as a side input
> to key for the list of lists.
>
> Thanks for the help, lemme know more in the future about how coders work
> and instantiate and I'd love to help contribute by adding some new coders.
>
> - Shannon
>
> On Thu, Jul 11, 2019 at 4:59 PM Shannon Duncan 
> wrote:
>
>> Will do. Thanks. A new coder for deterministic Maps would be great in the
>> future. Thank you!
>>
>> On Thu, Jul 11, 2019 at 4:58 PM Rui Wang  wrote:
>>
>>> I think Mike refers to ListCoder
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java>
>>>  which
>>> is deterministic if its element is the same. Maybe you can search the repo
>>> for examples of ListCoder?
>>>
>>>
>>> -Rui
>>>
>>> On Thu, Jul 11, 2019 at 2:55 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> So ArrayList doesn't work either, so just a standard List?
>>>>
>>>> On Thu, Jul 11, 2019 at 4:53 PM Rui Wang  wrote:
>>>>
>>>>> Shannon, I agree with Mike on List is a good workaround if your
>>>>> element within list is deterministic and you are eager to make your new
>>>>> pipeline working.
>>>>>
>>>>>
>>>>> Let me send back some pointers to adding new coder later.
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Thu, Jul 11, 2019 at 2:45 PM Shannon Duncan <
>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>
>>>>>> I just started learning Java today to attempt to convert our python
>>>>>> pipelines to Java to take advantage of key features that Java has. I have
>>>>>> no idea how I would create a new coder and include it in for beam to
>>>>>> recognize.
>>>>>>
>>>>>> If you can point me in the right direction of where it hooks together
>>>>>> I might be able to figure that out. I can duplicate MapCoder and try to
>>>>>> make changes, but how will beam know to pick up that coder for a 
>>>>>> groupByKey?
>>>>>>
>>>>>> Thanks!
>>>>>> Shannon
>>>>>>
>>>>>> On Thu, Jul 11, 2019 at 4:42 PM Rui Wang  wrote:
>>>>>>
>>>>>>> It could be just straightforward to create a SortedMapCoder for
>>>>>>> TreeMap. Just add checks on map instances and then change
>>>>>>> verifyDeterministic.
>>>>>>>
>>>>>>> If this is a common need we could just submit it into Beam repo.
>>>>>>>
>>>>>>> [1]:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java#L146
>>>>>>>
>>>>>>> On Thu, Jul 11, 2019 at 2:26 PM Mike Pedersen 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> There isn't a coder for deterministic maps in Beam, so even if your
>>>>>>>> datastructure is deterministic, Beam will assume the serialized bytes
>>>>>>>> aren't deterministic.
>>>>>>>>
>>>>>>>> You could make one using the MapCoder as a guide:
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
>>>>>>>> Just change it such that the exception in VerifyDeterministic is
>>>>>>>> removed and when decoding it instantiates a TreeMap or such instead of 
>>>>>>>> a
>>>>>>>> HashMap.
>>>>>>>>
>>>>>>>> Alternatively, you could just represent your key as a sorted list
>>>>>>>> of KV pairs. Lookups could be done using binary search if necessary.
>>>>>>>>
>>>>>>>> Mike
>>>>>>>>
>>>>>>>> Den tor. 11. jul. 2019 kl. 22.41 skrev Shannon Duncan <
>>>>>>>> joseph.dun...@liveramp.com>:
>>>>>>>>
>>>>>>>>> So I'm working on essentially doing a word-count on a complex data
>>>>>>>>> structure.
>>>>>>>>>
>>>>>>>>> I tried just using a HashMap as the Structure, but that didn't
>>>>>>>>> work because it is non-deterministic.
>>>>>>>>>
>>>>>>>>> However when Given a LinkedHashMap or TreeMap which is
>>>>>>>>> deterministic the SDK complains that it's non-deterministic when 
>>>>>>>>> trying to
>>>>>>>>> use it as a key for GroupByKey.
>>>>>>>>>
>>>>>>>>> What would be an appropriate Map style data structure that would
>>>>>>>>> be deterministic enough for Apache Beam to accept it as a key?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Shannon
>>>>>>>>>
>>>>>>>>


Re: [Java] Using a complex datastructure as Key for KV

2019-07-11 Thread Rui Wang
I think Mike refers to ListCoder
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java>
which
is deterministic if its element is the same. Maybe you can search the repo
for examples of ListCoder?


-Rui

On Thu, Jul 11, 2019 at 2:55 PM Shannon Duncan 
wrote:

> So ArrayList doesn't work either, so just a standard List?
>
> On Thu, Jul 11, 2019 at 4:53 PM Rui Wang  wrote:
>
>> Shannon, I agree with Mike on List is a good workaround if your element
>> within list is deterministic and you are eager to make your new pipeline
>> working.
>>
>>
>> Let me send back some pointers to adding new coder later.
>>
>>
>> -Rui
>>
>> On Thu, Jul 11, 2019 at 2:45 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I just started learning Java today to attempt to convert our python
>>> pipelines to Java to take advantage of key features that Java has. I have
>>> no idea how I would create a new coder and include it in for beam to
>>> recognize.
>>>
>>> If you can point me in the right direction of where it hooks together I
>>> might be able to figure that out. I can duplicate MapCoder and try to make
>>> changes, but how will beam know to pick up that coder for a groupByKey?
>>>
>>> Thanks!
>>> Shannon
>>>
>>> On Thu, Jul 11, 2019 at 4:42 PM Rui Wang  wrote:
>>>
>>>> It could be just straightforward to create a SortedMapCoder for
>>>> TreeMap. Just add checks on map instances and then change
>>>> verifyDeterministic.
>>>>
>>>> If this is a common need we could just submit it into Beam repo.
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java#L146
>>>>
>>>> On Thu, Jul 11, 2019 at 2:26 PM Mike Pedersen 
>>>> wrote:
>>>>
>>>>> There isn't a coder for deterministic maps in Beam, so even if your
>>>>> datastructure is deterministic, Beam will assume the serialized bytes
>>>>> aren't deterministic.
>>>>>
>>>>> You could make one using the MapCoder as a guide:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
>>>>> Just change it such that the exception in VerifyDeterministic is
>>>>> removed and when decoding it instantiates a TreeMap or such instead of a
>>>>> HashMap.
>>>>>
>>>>> Alternatively, you could just represent your key as a sorted list of
>>>>> KV pairs. Lookups could be done using binary search if necessary.
>>>>>
>>>>> Mike
>>>>>
>>>>> Den tor. 11. jul. 2019 kl. 22.41 skrev Shannon Duncan <
>>>>> joseph.dun...@liveramp.com>:
>>>>>
>>>>>> So I'm working on essentially doing a word-count on a complex data
>>>>>> structure.
>>>>>>
>>>>>> I tried just using a HashMap as the Structure, but that didn't work
>>>>>> because it is non-deterministic.
>>>>>>
>>>>>> However when Given a LinkedHashMap or TreeMap which is deterministic
>>>>>> the SDK complains that it's non-deterministic when trying to use it as a
>>>>>> key for GroupByKey.
>>>>>>
>>>>>> What would be an appropriate Map style data structure that would be
>>>>>> deterministic enough for Apache Beam to accept it as a key?
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon
>>>>>>
>>>>>


Re: [Java] Using a complex datastructure as Key for KV

2019-07-11 Thread Rui Wang
Shannon, I agree with Mike on List is a good workaround if your element
within list is deterministic and you are eager to make your new pipeline
working.


Let me send back some pointers to adding new coder later.


-Rui

On Thu, Jul 11, 2019 at 2:45 PM Shannon Duncan 
wrote:

> I just started learning Java today to attempt to convert our python
> pipelines to Java to take advantage of key features that Java has. I have
> no idea how I would create a new coder and include it in for beam to
> recognize.
>
> If you can point me in the right direction of where it hooks together I
> might be able to figure that out. I can duplicate MapCoder and try to make
> changes, but how will beam know to pick up that coder for a groupByKey?
>
> Thanks!
> Shannon
>
> On Thu, Jul 11, 2019 at 4:42 PM Rui Wang  wrote:
>
>> It could be just straightforward to create a SortedMapCoder for TreeMap.
>> Just add checks on map instances and then change verifyDeterministic.
>>
>> If this is a common need we could just submit it into Beam repo.
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java#L146
>>
>> On Thu, Jul 11, 2019 at 2:26 PM Mike Pedersen 
>> wrote:
>>
>>> There isn't a coder for deterministic maps in Beam, so even if your
>>> datastructure is deterministic, Beam will assume the serialized bytes
>>> aren't deterministic.
>>>
>>> You could make one using the MapCoder as a guide:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
>>> Just change it such that the exception in VerifyDeterministic is removed
>>> and when decoding it instantiates a TreeMap or such instead of a HashMap.
>>>
>>> Alternatively, you could just represent your key as a sorted list of KV
>>> pairs. Lookups could be done using binary search if necessary.
>>>
>>> Mike
>>>
>>> Den tor. 11. jul. 2019 kl. 22.41 skrev Shannon Duncan <
>>> joseph.dun...@liveramp.com>:
>>>
>>>> So I'm working on essentially doing a word-count on a complex data
>>>> structure.
>>>>
>>>> I tried just using a HashMap as the Structure, but that didn't work
>>>> because it is non-deterministic.
>>>>
>>>> However when Given a LinkedHashMap or TreeMap which is deterministic
>>>> the SDK complains that it's non-deterministic when trying to use it as a
>>>> key for GroupByKey.
>>>>
>>>> What would be an appropriate Map style data structure that would be
>>>> deterministic enough for Apache Beam to accept it as a key?
>>>>
>>>> Thanks,
>>>> Shannon
>>>>
>>>


Re: [Java] Using a complex datastructure as Key for KV

2019-07-11 Thread Rui Wang
It could be just straightforward to create a SortedMapCoder for TreeMap.
Just add checks on map instances and then change verifyDeterministic.

If this is a common need we could just submit it into Beam repo.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java#L146

On Thu, Jul 11, 2019 at 2:26 PM Mike Pedersen  wrote:

> There isn't a coder for deterministic maps in Beam, so even if your
> datastructure is deterministic, Beam will assume the serialized bytes
> aren't deterministic.
>
> You could make one using the MapCoder as a guide:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
> Just change it such that the exception in VerifyDeterministic is removed
> and when decoding it instantiates a TreeMap or such instead of a HashMap.
>
> Alternatively, you could just represent your key as a sorted list of KV
> pairs. Lookups could be done using binary search if necessary.
>
> Mike
>
> Den tor. 11. jul. 2019 kl. 22.41 skrev Shannon Duncan <
> joseph.dun...@liveramp.com>:
>
>> So I'm working on essentially doing a word-count on a complex data
>> structure.
>>
>> I tried just using a HashMap as the Structure, but that didn't work
>> because it is non-deterministic.
>>
>> However when Given a LinkedHashMap or TreeMap which is deterministic the
>> SDK complains that it's non-deterministic when trying to use it as a key
>> for GroupByKey.
>>
>> What would be an appropriate Map style data structure that would be
>> deterministic enough for Apache Beam to accept it as a key?
>>
>> Thanks,
>> Shannon
>>
>


Re:

2019-06-24 Thread Rui Wang
I tried your example and found the same exception on direct runner. I think
MAP with Row as value types is not well supported in SQL so file
https://jira.apache.org/jira/browse/BEAM-7623 to improve it.


-Rui

On Mon, Jun 24, 2019 at 4:41 AM Vishwas Bm  wrote:

> Hi Rui,
>
> I was trying out a use case where we have a map with key as string and
> value as Row. When we try to access the primitive field in the Row we are
> getting below exception.
>
> Caused by: java.lang.NoSuchFieldException: color
> at java.lang.Class.getDeclaredField(Class.java:2070)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.runtime.SqlFunctions.structAccess(SqlFunctions.java:2405)
>
> Below is the schema and the sql query used:
>
> Schema primitiveFieldsScema =
> Schema.builder().addStringField("color").build();
> Schema inputSchema =
> Schema.builder().addMapField("mapWithValueAsRow", FieldType.STRING,
> FieldType.row(primitiveFieldsScema)).build();
> Map mapWithValueAsRow = new HashMap<>();
> Row row =
> Row.withSchema(primitiveFieldsScema).addValue("RED").build();
> mapWithValueAsRow.put("key", row);
>
> Row rowOfMap =
> Row.withSchema(inputSchema).addValue(mapWithValueAsRow).build();
>
> Query used:
>
> *   select  PCOLLECTION.mapWithValueAsRow['key'].color as
> color  from PCOLLECTION*
>
>
> In git there are scenario
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMapTest.java>
> of accessing map using select query. But in those values are all of
> primitive types. In my case the value is of type Row.
> So can you let me know whether this is supported or this is a bug ?
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Thu, Jun 20, 2019 at 11:25 PM Rui Wang  wrote:
>
>> Oops I made a mistake, I didn't work on[1] but actually [2]
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java
>> [2]:
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
>>
>> -Rui
>>
>> On Thu, Jun 20, 2019 at 9:58 AM Rui Wang  wrote:
>>
>>> I wrote some tests on nested row selection in BeamSQL[1]. Those test
>>> cases test some behaviors of nested row selection that BeamSQL supports(but
>>> it's not a complete list).
>>>
>>> You could check what are tested so that are supported. Also it's welcome
>>> to extend those tests to cover more behaviors.
>>>
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java
>>>
>>>
>>> -Rui
>>>
>>> On Thu, Jun 20, 2019 at 4:25 AM Andrew Pilloud 
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> Unfortunately you are receiving an UnsupportedOperationException
>>>> because selecting nested rows is not supported by Calcite. You select
>>>> fields out of the nested row but not a row itself. There are some recent
>>>> bug fixes in this area in Calcite 1.20, so it might be worth trying that.
>>>> There has been a lot of work on this area in Calcite in the past year so it
>>>> is also possible the work that remains is in Beam. We have a bug open on
>>>> the issue in Beam: https://issues.apache.org/jira/browse/BEAM-5189
>>>>
>>>> Beam is repackaging Calcite by copying and relocating the code using
>>>> the gradle relocate rule. The code is unchanged, but
>>>> 'org.apache.beam.repackaged.beam_sdks_java_extensions_sql' is added to the
>>>> class path so it won't conflict with Calcite in use by some of the Beam
>>>> runners. If you want to change the version to a local snapshot of calcite
>>>> for development you can modify it here:
>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/build.gradle#L72
>>>>
>>>> Andrew
>>>>
>>>> On Thu, Jun 20, 2019 at 3:02 AM alex goos  wrote:
>>>>
>>>>> Beam 2.13, I'm trying to do SqlTransform on Row records with
>>>>> hierarchical structures. Records having (nullable) arrays of (nullabke)
>>>>> sub-records:
>>>>>
>>>>> "select p.recordType, p.listOfServiceData.se

Re: AvroUtils converting generic record to Beam Row causes class cast exception

2019-06-19 Thread Rui Wang
PR/8376 is merged and it should be in 2.14.0 release.


-Rui

On Mon, Apr 22, 2019 at 10:49 AM Rui Wang  wrote:

> I see. I created this PR [1] to ask feedback from the reviewer who knows
> better on Avro in Beam.
>
> -Rui
>
>
> [1]: https://github.com/apache/beam/pull/8376
>
>
> On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm  wrote:
>
>> Hi Rui,
>>
>> I checked the AvroUtils code. There is a static intializer block
>> basically it registers Avro Timestamp Conversion functions for logical type
>> timestamp-millis.
>>
>> *// Code Snippet below*
>> static {
>> // This works around a bug in the Avro library (AVRO-1891) around
>> SpecificRecord's handling
>> // of DateTime types.
>>SpecificData.get().addLogicalTypeConversion(new TimeConversions.
>> TimestampConversion());
>>GenericData.get().addLogicalTypeConversion(new TimeConversions.
>> TimestampConversion());
>> }
>>
>> Because of this when deserializing generic record from kafka using
>> KafkaAvroDeserializer, the long value produced at the producer end gets
>> converted to joda-time during deserialization.
>>
>> Next when we try to convert this genericRecord to Row as part of
>> AvroUtils.toBeamRowStrict function, we again try to convert the value
>> recieved to joda-time.
>> But the exception is thrown as there is type cast to Long.
>>
>> *// Code Snippet Below:*
>> else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>>  return convertDateTimeStrict((Long) value, fieldType); *<--
>> Class cast exception is thrown here, as we are typecasting from JodaTime to
>> Long*
>> }
>>
>> private static Object convertDateTimeStrict (Long value, Schema.FieldType
>> fieldType) {
>>  checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "
>> dateTime");
>>  return new Instant(value);  <--  *Creates a JodaTime
>> Instance here*
>> }
>>
>>
>> *Thanks & Regards,*
>>
>> *Vishwas *
>>
>>
>>
>> On Tue, Apr 16, 2019 at 9:18 AM Rui Wang  wrote:
>>
>>> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
>>> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
>>> GenericRecord, and tries to cast objects based on their types (and
>>> cast(object) to long for "timestamp-millis"). see [1].
>>>
>>> So in order to use `AvroUtils.toBeamRowStrict`, the generated
>>> GenericRecord should have long for "timestamp-millis".
>>>
>>> The schema you pasted looks right. Not sure why generated class is Joda
>>> time (is it controlled by some flags?). But at least you could write a
>>> small function to do schema conversion for your need.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672
>>>
>>>
>>> Rui
>>>
>>>
>>> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm  wrote:
>>>
>>>> Hi Rui,
>>>>
>>>> I agree that by converting it to long, there will be no error.
>>>> But the KafkaIO is giving a GenericRecord with attribute of type
>>>> JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict
>>>> again converts it to JodaTime.
>>>>
>>>> I used the avro tools 1.8.2 jar, for the below schema and I see that
>>>> the generated class has a JodaTime attribute.
>>>>
>>>> {
>>>> "name": "timeOfRelease",
>>>> "type":
>>>> {
>>>> "type": "long",
>>>> "logicalType": "timestamp-millis",
>>>> "connect.version": 1,
>>>> "connect.name":
>>>> "org.apache.kafka.connect.data.Timestamp"
>>>> }
>>>>  }
>>>>
>>>> *Attribute type in generated class:*
>>>> private org.joda.time.DateTime timeOfRelease;
>>>>
>>>>
>>>> So not sure why this type casting is required.
>>>>
>>>>
>>>> *Thanks & Regards,*
>>>>
>>>> *Vishwas *
>>>>
>>>>
>>>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang  wrote:
>>>>
>>>>> Read from th

Re: SQL massively more resource-intensive? Memory leak?

2019-06-04 Thread Rui Wang
Sorry I couldn't be more helpful at this moment. Created a JIRA for this
issue: https://jira.apache.org/jira/browse/BEAM-7489


-Rui

On Mon, Jun 3, 2019 at 8:14 PM dekl...@gmail.com  wrote:

> Yes, it works but I think it has the same problem. It's a lot slower so it
> took me hours of running it, but by the end the memory usage was high and
> the CPU about 100% so it seems to be the same problem.
>
> Worth noting perhaps that when I use the DirectRunner I have to turn
> enforceImmutability off because of
> https://issues.apache.org/jira/browse/BEAM-1714
>
> On 2019/06/03 17:48:30, Rui Wang  wrote:
> > Ha sorry I was only reading screenshots but ignored your other comments.
> So
> > count fn indeed worked.
> >
> > Can I ask if your sql pipeline works on direct runner?
> >
> >
> > -Rui
> >
> > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang  wrote:
> >
> > > BeamSQL actually only converts SELECT COUNT(*) query to the Java
> pipeline
> > > that calls Java's builtin Count[1] transform.
> > >
> > > Could you implement your pipeline by Count transform to see whether
> this
> > > memory issue still exists? By doing so we could narrow down problem a
> bit.
> > > If using Java directly without going through SQL code path and it
> works, we
> > > will know that BeamSQL does not generate a working pipeline for yoru
> SELECT
> > > COUNT(*) query.
> > >
> > >
> > > [1]:
> > >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
> > >
> > > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com 
> > > wrote:
> > >
> > >> Is using Beam SQL just massively more resource-intensive or is there a
> > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner)
> > >>
> > >> Here is my code:
> > >> https://pastebin.com/nNtc9ZaG
> > >>
> > >> Here is the error I get (truncated at the end because it's so long and
> > >> seemingly repetitive) when I run the SQL transform and my memory/CPU
> usage
> > >> skyrockets:
> > >> https://pastebin.com/mywmkCQi
> > >>
> > >> For example,
> > >>
> > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
> > >> everything working fine:
> > >>
> > >>
> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
> > >>
> > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
> > >> RAM, soon crashes:
> > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
> > >> PCOLLECTION")).apply(myPrint());`
> > >>
> > >> I can't imagine this is expected behavior but maybe I'm just ignorant
> of
> > >> how SQL is implemented.
> > >>
> > >> Most of this code is just getting Schema Registry Avro Kafka messages
> > >> into a Row stream. There have been discussions on the mailing list
> recently
> > >> about how to do that. This is the best I could do. If that part is
> > >> incorrect I'd be glad to know.
> > >>
> > >> Any help appreciated. Thank you.
> > >>
> > >>
> >
>


Re: SQL massively more resource-intensive? Memory leak?

2019-06-03 Thread Rui Wang
Ha sorry I was only reading screenshots but ignored your other comments. So
count fn indeed worked.

Can I ask if your sql pipeline works on direct runner?


-Rui

On Mon, Jun 3, 2019 at 10:39 AM Rui Wang  wrote:

> BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline
> that calls Java's builtin Count[1] transform.
>
> Could you implement your pipeline by Count transform to see whether this
> memory issue still exists? By doing so we could narrow down problem a bit.
> If using Java directly without going through SQL code path and it works, we
> will know that BeamSQL does not generate a working pipeline for yoru SELECT
> COUNT(*) query.
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
>
> On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com 
> wrote:
>
>> Is using Beam SQL just massively more resource-intensive or is there a
>> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner)
>>
>> Here is my code:
>> https://pastebin.com/nNtc9ZaG
>>
>> Here is the error I get (truncated at the end because it's so long and
>> seemingly repetitive) when I run the SQL transform and my memory/CPU usage
>> skyrockets:
>> https://pastebin.com/mywmkCQi
>>
>> For example,
>>
>> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
>> everything working fine:
>>
>> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
>>
>> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
>> RAM, soon crashes:
>> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
>> PCOLLECTION")).apply(myPrint());`
>>
>> I can't imagine this is expected behavior but maybe I'm just ignorant of
>> how SQL is implemented.
>>
>> Most of this code is just getting Schema Registry Avro Kafka messages
>> into a Row stream. There have been discussions on the mailing list recently
>> about how to do that. This is the best I could do. If that part is
>> incorrect I'd be glad to know.
>>
>> Any help appreciated. Thank you.
>>
>>


Re: SQL massively more resource-intensive? Memory leak?

2019-06-03 Thread Rui Wang
BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline
that calls Java's builtin Count[1] transform.

Could you implement your pipeline by Count transform to see whether this
memory issue still exists? By doing so we could narrow down problem a bit.
If using Java directly without going through SQL code path and it works, we
will know that BeamSQL does not generate a working pipeline for yoru SELECT
COUNT(*) query.


[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54

On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com  wrote:

> Is using Beam SQL just massively more resource-intensive or is there a bug
> somewhere here (in my code or elsewhere)? (I'm using Flink runner)
>
> Here is my code:
> https://pastebin.com/nNtc9ZaG
>
> Here is the error I get (truncated at the end because it's so long and
> seemingly repetitive) when I run the SQL transform and my memory/CPU usage
> skyrockets:
> https://pastebin.com/mywmkCQi
>
> For example,
>
> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM, everything
> working fine:
>
> `rowStream.apply(Combine.globally(Count.combineFn()).withoutDefaults()).apply(myPrint());`
>
> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+ RAM,
> soon crashes:
> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
> PCOLLECTION")).apply(myPrint());`
>
> I can't imagine this is expected behavior but maybe I'm just ignorant of
> how SQL is implemented.
>
> Most of this code is just getting Schema Registry Avro Kafka messages into
> a Row stream. There have been discussions on the mailing list recently
> about how to do that. This is the best I could do. If that part is
> incorrect I'd be glad to know.
>
> Any help appreciated. Thank you.
>
>


Re: Question about unbounded in-memory PCollection

2019-05-07 Thread Rui Wang
Does TestStream.java
[1]
satisfy your need?



-Rui

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java

On Tue, May 7, 2019 at 2:47 PM Chengzhi Zhao 
wrote:

> Hi Beam Team,
>
> I am new to here and recently study the programming guide, I have a
> question about the in-memory data,
> https://beam.apache.org/documentation/programming-guide/#creating-a-pcollection
>
> Is there a way to create unbounded PCollection from the in-memory
> collection? I want to test the unbounded PCollection locally and don't know
> what's the easiest way to get unbounded PCollection. Please let me know if
> I am doing something wrong or I should use a file system to do it.
>
> Thanks in advance!
>
> Best,
> Chengzhi
>
> On Tue, May 7, 2019 at 5:46 PM Chengzhi Zhao 
> wrote:
>
>> Hi Beam Team,
>>
>> I am new to here and recently study the programming guide, I have a
>> question about the in-memory data,
>> https://beam.apache.org/documentation/programming-guide/#creating-a-pcollection
>>
>> Is there a way to create unbounded PCollection from the in-memory
>> collection? I want to test the unbounded PCollection locally and don't know
>> what's the easiest way to get unbounded PCollection. Please let me know if
>> I am doing something wrong or I should use a file system to do it.
>>
>> Thanks in advance!
>>
>> Best,
>> Chengzhi
>>
>


Re: AvroUtils converting generic record to Beam Row causes class cast exception

2019-04-22 Thread Rui Wang
I see. I created this PR [1] to ask feedback from the reviewer who knows
better on Avro in Beam.

-Rui


[1]: https://github.com/apache/beam/pull/8376


On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm  wrote:

> Hi Rui,
>
> I checked the AvroUtils code. There is a static intializer block basically
> it registers Avro Timestamp Conversion functions for logical type
> timestamp-millis.
>
> *// Code Snippet below*
> static {
> // This works around a bug in the Avro library (AVRO-1891) around
> SpecificRecord's handling
> // of DateTime types.
>SpecificData.get().addLogicalTypeConversion(new TimeConversions.
> TimestampConversion());
>GenericData.get().addLogicalTypeConversion(new TimeConversions.
> TimestampConversion());
> }
>
> Because of this when deserializing generic record from kafka using
> KafkaAvroDeserializer, the long value produced at the producer end gets
> converted to joda-time during deserialization.
>
> Next when we try to convert this genericRecord to Row as part of
> AvroUtils.toBeamRowStrict function, we again try to convert the value
> recieved to joda-time.
> But the exception is thrown as there is type cast to Long.
>
> *// Code Snippet Below:*
> else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>  return convertDateTimeStrict((Long) value, fieldType); *<--
> Class cast exception is thrown here, as we are typecasting from JodaTime to
> Long*
> }
>
> private static Object convertDateTimeStrict (Long value, Schema.FieldType
> fieldType) {
>  checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "
> dateTime");
>  return new Instant(value);  <--  *Creates a JodaTime
> Instance here*
> }
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
>
> On Tue, Apr 16, 2019 at 9:18 AM Rui Wang  wrote:
>
>> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
>> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
>> GenericRecord, and tries to cast objects based on their types (and
>> cast(object) to long for "timestamp-millis"). see [1].
>>
>> So in order to use `AvroUtils.toBeamRowStrict`, the generated
>> GenericRecord should have long for "timestamp-millis".
>>
>> The schema you pasted looks right. Not sure why generated class is Joda
>> time (is it controlled by some flags?). But at least you could write a
>> small function to do schema conversion for your need.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672
>>
>>
>> Rui
>>
>>
>> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm  wrote:
>>
>>> Hi Rui,
>>>
>>> I agree that by converting it to long, there will be no error.
>>> But the KafkaIO is giving a GenericRecord with attribute of type
>>> JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict
>>> again converts it to JodaTime.
>>>
>>> I used the avro tools 1.8.2 jar, for the below schema and I see that the
>>> generated class has a JodaTime attribute.
>>>
>>> {
>>> "name": "timeOfRelease",
>>> "type":
>>> {
>>> "type": "long",
>>> "logicalType": "timestamp-millis",
>>> "connect.version": 1,
>>> "connect.name":
>>> "org.apache.kafka.connect.data.Timestamp"
>>> }
>>>  }
>>>
>>> *Attribute type in generated class:*
>>> private org.joda.time.DateTime timeOfRelease;
>>>
>>>
>>> So not sure why this type casting is required.
>>>
>>>
>>> *Thanks & Regards,*
>>>
>>> *Vishwas *
>>>
>>>
>>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang  wrote:
>>>
>>>> Read from the code and seems like as the logical type
>>>> "timestamp-millis" means, it's expecting millis in Long as values under
>>>> this logical type.
>>>>
>>>> So if you can convert joda-time to millis before calling
>>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
>>>> will gone.
>>>>
>>>> -Rui
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cw

Re: AvroUtils converting generic record to Beam Row causes class cast exception

2019-04-15 Thread Rui Wang
I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
GenericRecord, and tries to cast objects based on their types (and
cast(object) to long for "timestamp-millis"). see [1].

So in order to use `AvroUtils.toBeamRowStrict`, the generated GenericRecord
should have long for "timestamp-millis".

The schema you pasted looks right. Not sure why generated class is Joda
time (is it controlled by some flags?). But at least you could write a
small function to do schema conversion for your need.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672


Rui


On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm  wrote:

> Hi Rui,
>
> I agree that by converting it to long, there will be no error.
> But the KafkaIO is giving a GenericRecord with attribute of type JodaTime.
> Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict again
> converts it to JodaTime.
>
> I used the avro tools 1.8.2 jar, for the below schema and I see that the
> generated class has a JodaTime attribute.
>
> {
> "name": "timeOfRelease",
> "type":
> {
> "type": "long",
> "logicalType": "timestamp-millis",
> "connect.version": 1,
> "connect.name":
> "org.apache.kafka.connect.data.Timestamp"
> }
>  }
>
> *Attribute type in generated class:*
> private org.joda.time.DateTime timeOfRelease;
>
>
> So not sure why this type casting is required.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang  wrote:
>
>> Read from the code and seems like as the logical type "timestamp-millis"
>> means, it's expecting millis in Long as values under this logical type.
>>
>> So if you can convert joda-time to millis before calling
>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
>> will gone.
>>
>> -Rui
>>
>>
>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik  wrote:
>>
>>> +dev 
>>>
>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm  wrote:
>>>
>>>> Hi,
>>>>
>>>> Below is my pipeline:
>>>>
>>>> KafkaSource (KafkaIO.read) --> Pardo ---> BeamSql
>>>> ---> KafkaSink(KafkaIO.write)
>>>>
>>>>
>>>> The avro schema of the topic has a field of logical type
>>>> timestamp-millis.  KafkaIO.read transform is creating a
>>>> KafkaRecord, where this field is being converted to
>>>> joda-time.
>>>>
>>>> In my Pardo transform, I am trying to use the AvroUtils class methods
>>>> to convert the generic record to Beam Row and getting below class cast
>>>> exception for the joda-time attribute.
>>>>
>>>>  AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>>>
>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot
>>>> be cast to java.lang.Long
>>>> at
>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>> at
>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>>
>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>>> for this
>>>>
>>>>
>>>>
>>>> *Thanks & Regards,*
>>>>
>>>> *Vishwas *
>>>>
>>>>


Re: AvroUtils converting generic record to Beam Row causes class cast exception

2019-04-15 Thread Rui Wang
Read from the code and seems like as the logical type "timestamp-millis"
means, it's expecting millis in Long as values under this logical type.

So if you can convert joda-time to millis before calling
"AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
will gone.

-Rui


On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik  wrote:

> +dev 
>
> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm  wrote:
>
>> Hi,
>>
>> Below is my pipeline:
>>
>> KafkaSource (KafkaIO.read) --> Pardo ---> BeamSql
>> ---> KafkaSink(KafkaIO.write)
>>
>>
>> The avro schema of the topic has a field of logical type
>> timestamp-millis.  KafkaIO.read transform is creating a
>> KafkaRecord, where this field is being converted to
>> joda-time.
>>
>> In my Pardo transform, I am trying to use the AvroUtils class methods to
>> convert the generic record to Beam Row and getting below class cast
>> exception for the joda-time attribute.
>>
>>  AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>
>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be
>> cast to java.lang.Long
>> at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>> at
>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>
>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 for
>> this
>>
>>
>>
>> *Thanks & Regards,*
>>
>> *Vishwas *
>>
>>


Re: Pipeline manager/scheduler frameworks

2019-02-08 Thread Rui Wang
Apache Airflow is a scheduling system that can help manage data pipelines.
I have seen Airflow is used to manage a few thousand hive/spark/presto
pipelines.


-Rui

On Fri, Feb 8, 2019 at 4:08 PM Sridevi Nookala <
snook...@parallelwireless.com> wrote:

> Hi,
>
>
> Our analytics app has many data pipelines , some in python /java (using
> beam) etc,
>
> Any suggestions for a pipeline manager/scheduler framework that
> manages/orchestrates these different pipelines.
>
>
> thanks
>
> Sri
>


Re: How to call Oracle stored proc in JdbcIO

2019-02-05 Thread Rui Wang
I see. I wanted to get some feedback from community to see if calling proc
before running sql makes sense in JdbcIO. If that doesn't make sense then I
can close this JIRA.


-Rui

On Tue, Feb 5, 2019 at 3:00 PM Juan Carlos Garcia 
wrote:

> I believe this is not a missing feature, as the question is more inclined
> toward what do you expect from this procedure?
>
> Like reading back a ref cursor into a PCollection, or just doing an insert
> / update via the sp.
>
> Going forward on the jdbc realm you just need to create a prepared
> statement with something like this:
>
> {call procedure_name(?, ?, ?)}
>
> But then question is what do you expect from it?
>
> BTW JdbcIO is just a very simple ParDo which you can create your own when
> dealing with anything special from oracle.
>
> Best regards
>
> JC
>
>
>
> Am Di., 5. Feb. 2019, 23:03 hat Rui Wang  geschrieben:
>
>> Assuming this is a missing feature. Created
>> https://jira.apache.org/jira/browse/BEAM-6525 to track it.
>>
>> -Rui
>>
>> On Fri, Jan 25, 2019 at 10:35 AM Rui Wang  wrote:
>>
>>> Hi Community,
>>>
>>> There is a stackoverflow question [1] asking how to call Oracle stored
>>> proc in Beam via JdbcIO. I know very less on JdbcIO and Oracle, so just
>>> help ask here to say if anyone know: does JdbcIO support call stored proc?
>>>
>>> If there is no such support, I will create a JIRA for it and reply to
>>> the question.
>>>
>>>  -Rui
>>>
>>> [1]:
>>> https://stackoverflow.com/questions/54364783/how-to-call-an-oracle-stored-proc-in-apache-beam
>>>
>>


Re: How to call Oracle stored proc in JdbcIO

2019-02-05 Thread Rui Wang
Assuming this is a missing feature. Created
https://jira.apache.org/jira/browse/BEAM-6525 to track it.

-Rui

On Fri, Jan 25, 2019 at 10:35 AM Rui Wang  wrote:

> Hi Community,
>
> There is a stackoverflow question [1] asking how to call Oracle stored
> proc in Beam via JdbcIO. I know very less on JdbcIO and Oracle, so just
> help ask here to say if anyone know: does JdbcIO support call stored proc?
>
> If there is no such support, I will create a JIRA for it and reply to the
> question.
>
>  -Rui
>
> [1]:
> https://stackoverflow.com/questions/54364783/how-to-call-an-oracle-stored-proc-in-apache-beam
>


Re: First steps: Beam SQL over ProtoBuf

2019-02-01 Thread Rui Wang
It's awesome! Proto to/from Row will be a great utility!

-Rui

On Fri, Feb 1, 2019 at 9:59 AM Alex Van Boxel  wrote:

> Hi all,
>
> I got my first test pipeline running with *Beam SQL over ProtoBuf*. I was
> so excited I need to shout this to the world ;-)
>
>
> https://github.com/anemos-io/proto-beam/blob/master/transform/src/test/java/io/anemos/protobeam/transform/beamsql/BeamSqlPipelineTest.java
>
> A lot of work is planned but this is the first step. If you're invested in
> ProtoBuf and use Beam, follow the repo:
> https://github.com/anemos-io/proto-beam
> 
>
>  _/
> _/ Alex Van Boxel
>


How to call Oracle stored proc in JdbcIO

2019-01-25 Thread Rui Wang
Hi Community,

There is a stackoverflow question [1] asking how to call Oracle stored proc
in Beam via JdbcIO. I know very less on JdbcIO and Oracle, so just help ask
here to say if anyone know: does JdbcIO support call stored proc?

If there is no such support, I will create a JIRA for it and reply to the
question.

 -Rui

[1]:
https://stackoverflow.com/questions/54364783/how-to-call-an-oracle-stored-proc-in-apache-beam


Re: Suggestion or Alternative simples to read file from FTP

2019-01-03 Thread Rui Wang
For the calling external service, it's described in [1] as a pattern which
has a small sample of code instruction.

However, why not write a script to prepare the data first and then write a
pipeline to process it?

1.
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

On Thu, Jan 3, 2019 at 6:13 AM Henrique Molina 
wrote:

> Hi Folks ,
> I'm newbie in Beam, but I looking for some way to read an File stored at
> FTP
>
> First of all, I could create ParDo, using FTPClient (Commons-net) and
> access returning an Byte[] of File *.csv.
> second ParDO create the csv
> third  PardDo using the TextIO to read lines
>
> Somebody could share some sources and materials about that ?
> Or Do you have another alternative more simple ?
> Thanks & Regards
>


Re: BigQueryIO failure handling for writes

2018-11-16 Thread Rui Wang
To the first issue your are facing:

In BeamSQL, we tried to solve the similar requirement.

BeamSQL supports reading JSON format message from Pubsub, writing to
Bigquery and writing messages that fail to parse in another Pubsub topic.
BeamSQL uses the pre-processing transform to parse JSON payload, like what
you have done. In that transform, BeamSQL tags problematic messages by
PubsubMessageToRow.java#L86
,
and after that transform, it adds another transform to send tagged messages
to a separate topic PubsubIOJsonTable.java#L148
<:https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java#L148>
.


-Rui

On Fri, Nov 16, 2018 at 1:58 PM Jeff Klukas  wrote:

> I'm trying to write a robust pipeline that takes input from PubSub and
> writes to BigQuery. For every PubsubMessage that is not successfully
> written to BigQuery, I'd like to get the original PubsubMessage back and be
> able to write to an error output collection. I'm not sure this is quite
> possible, though.
>
> The first issue is that BigQueryIO's withFormatFunction doesn't seem to
> provide any error handling. If my formatFunction raises an exception, it
> will bubble up to a PipelineExecutionException and kill the job. I'd like
> to be able to catch the exception instead and send the original payload to
> an error output collection. To get around this, we're using a
> pre-processing transform to parse our JSON payload into a TableRow as a
> separate step, then calling BigQueryIO.writeTableRows (which is documented
> as something to avoid).
>
> Similarly, I'd like to be able to recover the original message if a
> failure occurs after formatFunction and BigQuery rejects the insert.
> WriteResult.getFailedInserts() initially seemed to do this, but it looks to
> always return TableRow rather than the original pre-formatFunction message.
>
> Also, I found that failed inserts by default raise an exception that stops
> processing. I found that I had to set
> InsertRetryPolicy.retryTransientErrors() in order to avoid failed inserts
> bubbling up to PipelineExecutionException.
>
> Are there details I'm missing of the existing API that would allow me to
> do the kind of error handling I'm talking about here?
> Is setting a non-default InsertRetryPolicy required for getFailedInserts,
> or is that a bug?
> Do others see a need for changes to the BigQueryIO.Write API to enable
> better error handling?
>


Re: coder issue?

2018-08-17 Thread Rui Wang
Hi Robin,

I have the same question here. If we don't implement coder for class Accum
but only add implements Serializable to class Accum, how is the accumulator
coder generated?

-Rui

On Fri, Aug 17, 2018 at 11:50 AM Mahesh Vangala 
wrote:

> Thanks, Robin.
> Based on Robin's comment above, I looked into CombineFn test script in
> beam git repo, and implemented getCoder method along the lines in that
> script. (
> https://github.com/vangalamaheshh/my-beam/blob/master/variant-caller/src/main/java/pipelines/variant_caller/AddLines.java
> )
> Do you think, getCoder implementation is not necessary?
> Thanks for your help though.
> Much appreciate!
>
> - Mahesh
>
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Fri, Aug 17, 2018 at 1:01 PM Robin Qiu  wrote:
>
>> Hi Mahesh,
>>
>> I think you have the NullPointerException because your Accumulator is not
>> initialized properly.
>>
>> In your createAccumulator() method, you created a Accum object without
>> setting its line field. So later when accum.line got accessed, you got the
>> exception.
>>
>> By initializing the Accum class you should be able to fix this problem.
>> (e.g. String line = "";, instead of only String line;)
>>
>> Hope this helps,
>> Robin
>>
>> On Thu, Aug 16, 2018 at 12:14 PM Rui Wang  wrote:
>>
>>> Sorry I forgot to attach the PR link:
>>> https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342
>>>
>>> -Rui
>>>
>>> On Thu, Aug 16, 2018 at 12:13 PM Rui Wang  wrote:
>>>
>>>> Hi Mahesh,
>>>>
>>>> I think I had the same NPE when I explored self defined combineFn. I
>>>> think your combineFn might still need to define a coder to help Beam run it
>>>> in distributed environment. Beam tries to invoke coder somewhere and then
>>>> throw a NPE as there is no one defined.
>>>>
>>>> Here is a PR I wrote that defined a AccumulatingCombineFn and
>>>> implemented a coder for that for you reference.
>>>>
>>>> -Rui
>>>>
>>>> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala <
>>>> vangalamahe...@gmail.com> wrote:
>>>>
>>>>> Hello Robin -
>>>>>
>>>>> Thank you so much for your help.
>>>>> I added Serializable to Accum, and I got the following error. (Sorry,
>>>>> for being a pain. I hope once I get past the initial hump ...)
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>>>>> getEstimatedSizeBytes
>>>>>
>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>
>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>>> ms and produced 1 files and 9 bundles
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal. Element KV{null,
>>>>> pipelines.variant_caller.AddLines$Accum@52449030}
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal. Element KV{null,
>>>>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>>>>
>>>>> Aug 16, 2018 3:00:09 PM
>>>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>>>>> verifyUnmodifiedThrowingCheckedExceptions
>>>>>
>>>>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>>>>> #structuralValue method which does not return true when the encoding of 
>>>>> the
>>>>> elements is equal.

Re: coder issue?

2018-08-16 Thread Rui Wang
Sorry I forgot to attach the PR link:
https://github.com/apache/beam/pull/6154/files#diff-7358f3f0511940ea565e6584f652ed02R342

-Rui

On Thu, Aug 16, 2018 at 12:13 PM Rui Wang  wrote:

> Hi Mahesh,
>
> I think I had the same NPE when I explored self defined combineFn. I think
> your combineFn might still need to define a coder to help Beam run it in
> distributed environment. Beam tries to invoke coder somewhere and then
> throw a NPE as there is no one defined.
>
> Here is a PR I wrote that defined a AccumulatingCombineFn and implemented
> a coder for that for you reference.
>
> -Rui
>
> On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala 
> wrote:
>
>> Hello Robin -
>>
>> Thank you so much for your help.
>> I added Serializable to Accum, and I got the following error. (Sorry, for
>> being a pain. I hope once I get past the initial hump ...)
>>
>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
>> getEstimatedSizeBytes
>>
>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>
>> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>>
>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
>> and produced 1 files and 9 bundles
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@52449030}
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>>
>> Aug 16, 2018 3:00:09 PM
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
>> verifyUnmodifiedThrowingCheckedExceptions
>>
>> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
>> #structuralValue method which does not return true when the encoding of the
>> elements is equal. Element KV{null,
>> pipelines.variant_caller.AddLines$Accum@7076d18e}
>>
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.NullPointerException
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:332)
>>
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>> DirectRunner.java:302)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>>
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>
>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>
>> Caused by: java.lang.NullPointerException
>>
>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)
>>
>> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu  wrote:
>>
>>> Hello Mahesh,
>>>
>>> You can add "implements Serializable" to the Accum class, then it should
>>> work.
>>>
>>> By the way, in Java String is immutable, so in order to change, for
>>> example, accum.line, you need to write accum.line = accum.line.concat(line).
>>>
>>> Best,
>>> Robin
>>>
>>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala <
>>> vangalamahe...@gmail.com> wrote:
>>>
>>>> Hello all -
>>>>
>>>> I am trying to run a barebone beam pipeline to understand the "combine"
>>>> logic. I am from python world trying to learn java beam sdk due to my use
>>>> case of ETL with spark cluster. So, pardon me for my grotesque java code :)
>>>>
>>>> I appreciate if you could nudge me in the right path with this error:
>>>>

Re: coder issue?

2018-08-16 Thread Rui Wang
Hi Mahesh,

I think I had the same NPE when I explored self defined combineFn. I think
your combineFn might still need to define a coder to help Beam run it in
distributed environment. Beam tries to invoke coder somewhere and then
throw a NPE as there is no one defined.

Here is a PR I wrote that defined a AccumulatingCombineFn and implemented a
coder for that for you reference.

-Rui

On Thu, Aug 16, 2018 at 12:03 PM Mahesh Vangala 
wrote:

> Hello Robin -
>
> Thank you so much for your help.
> I added Serializable to Accum, and I got the following error. (Sorry, for
> being a pain. I hope once I get past the initial hump ...)
>
> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource
> getEstimatedSizeBytes
>
> INFO: Filepattern test_in.csv matched 1 files with total size 36
>
> Aug 16, 2018 3:00:09 PM org.apache.beam.sdk.io.FileBasedSource split
>
> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 ms
> and produced 1 files and 9 bundles
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@52449030}
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@59bb25e2}
>
> Aug 16, 2018 3:00:09 PM
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector
> verifyUnmodifiedThrowingCheckedExceptions
>
> WARNING: Coder of type class org.apache.beam.sdk.coders.KvCoder has a
> #structuralValue method which does not return true when the encoding of the
> elements is equal. Element KV{null,
> pipelines.variant_caller.AddLines$Accum@7076d18e}
>
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.NullPointerException
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:332)
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
> DirectRunner.java:302)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>
> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>
> Caused by: java.lang.NullPointerException
>
> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:35)
>
> at pipelines.variant_caller.AddLines.mergeAccumulators(AddLines.java:1)
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com *
>
>
> On Thu, Aug 16, 2018 at 2:41 PM Robin Qiu  wrote:
>
>> Hello Mahesh,
>>
>> You can add "implements Serializable" to the Accum class, then it should
>> work.
>>
>> By the way, in Java String is immutable, so in order to change, for
>> example, accum.line, you need to write accum.line = accum.line.concat(line).
>>
>> Best,
>> Robin
>>
>> On Thu, Aug 16, 2018 at 10:42 AM Mahesh Vangala 
>> wrote:
>>
>>> Hello all -
>>>
>>> I am trying to run a barebone beam pipeline to understand the "combine"
>>> logic. I am from python world trying to learn java beam sdk due to my use
>>> case of ETL with spark cluster. So, pardon me for my grotesque java code :)
>>>
>>> I appreciate if you could nudge me in the right path with this error:
>>> (please see below)
>>>
>>> Here's my code: (read lines from input file and output the same lines to
>>> outfile)
>>>
>>> public class VariantCaller
>>>
>>> {
>>>
>>>
>>>
>>> public static void main( String[] args )
>>>
>>> {
>>>
>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>> ).create();
>>>
>>> Pipeline p = Pipeline.create(opts);
>>>
>>> PCollection lines = p.apply(TextIO.read().from(
>>> "test_in.csv"));
>>>
>>> PCollection mergedLines = lines.apply(Combine.globally(
>>> new AddLines()))
>>>
>>> mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>
>>> p.run();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> AddLines Class:
>>>
>>>
>>> public class AddLines extends CombineFn
>>> {
>>>
>>>   /**
>>>
>>>*
>>>
>>>*/
>>>
>>>   private static final long serialVersionUID = 1L;
>>>
>>>
>>>   public static class Accum {
>>>
>>> String line;
>>>
>>>   }
>>>
>>>
>>>   @Override
>>>
>>>   public Accum createAccumulator() { return new Accum(); }
>>>
>>>
>>>   @Override
>>>
>>>   public Accum addInput(Accum accum, String line) {
>>>