Re: Count(distinct) not working in beam sql

2023-11-11 Thread Talat Uyarer via user
Hi,

I saw this a little bit late. I implement a custom count distinct for our
streaming use case. If you are looking for something close enough but not
exact you can use my UDF. It uses the HyperLogLogPlus algorithm, which is
an efficient and scalable way to estimate cardinality with a controlled
level of accuracy.

I put on Github Gist
https://gist.github.com/talatuyarer/eb3a3796dcc93ede8f99d61961934a34

On Fri, Nov 3, 2023 at 8:02 AM Alexey Romanenko 
wrote:

> Unfortunatelly, Beam SQL doesn’t support COUNT(DISTINCT) aggregation.
>
> More details about “why" is on this discussion [1] and the related open
> issue for that here [2].
>
> —
> Alexey
>
> [1] https://lists.apache.org/thread/hvmy6d5dls3m8xcnf74hfmy1xxfgj2xh
> 
> [2] https://github.com/apache/beam/issues/19398
> 
>
>
> On 2 Nov 2023, at 20:52, Goutham Miryala 
> wrote:
>
> Hey Team,
>
> We're trying to implement an aggregation which involves *several
> trillions of rows *using apache beam sql.
> However I'm getting an exception
> Exception in thread "main" java.lang.UnsupportedOperationException: Does
> not support COUNT DISTINCT
>
> Here's the code for doing the aggregation:
>
> PCollection aggregate = joinedCollection.apply("Aggregation",
> SqlTransform.query("SELECT" +
> "exchange_name as adexchange," +
> "strategy," +
> "platform," +
> "segment," +
> "auction_type," +
> "placement_type," +
> "country," +
> "COALESCE(loss, 0) AS loss_code," +
> "COUNT(DISTINCT identifier) AS uniques," +
> "no_bid_reason," +
> "SUM(1) AS auctions," +
> "SUM(CASE WHEN cpm_bid > 0 THEN 1 ELSE 0 END) AS 
> bids," +
> "SUM(cpm_bid) AS total_bid_price," +
> "SUM(CASE WHEN loss = 0 THEN 1 END) AS wins," +
> "app_bundle AS app_bundle," +
> "model_id AS model_id," +
> "identifier_type AS identifier_type," +
> "promotion_id AS promotion_id," +
> "sub_floor_bid_min_price_cohort AS 
> sub_floor_bid_min_price_cohort," +
> "bf_match_experiment AS bf_match_experiment," +
> "bep_matched_floor AS bep_matched_floor," +
> "SUM(p_ctr) AS p_ctr_total," +
> "SUM(p_ir) AS p_ir_total," +
> "SUM(p_cpa) AS p_cpa_total," +
> "SUM(arppu) AS arppu_total," +
> "SUM(spend) AS spend_total," +
> "SUM(cpm_price) AS cpm_price_total" +
> "FROM" +
> "PCOLLECTION" +
> "GROUP BY 
> exchange_name,strategy,platform,segment,auction_type" +
> ",placement_type,country,loss,no_bid_reason,app_bundle" +
> 
> ",model_id,identifier_type,promotion_id,sub_floor_bid_min_price_cohort" +
> ",bf_match_experiment,bep_matched_floor")
> );
>
>
> Can you please guide us?
>
> Let me know in case you need any more information.
>
> Goutham Miryala
> Senior Data Engineer
>
>
> 
>
>
>


Watermark Alignment on Flink Runner's UnboundedSourceWrapper

2023-05-19 Thread Talat Uyarer via user
Hi All,

I have a stream aggregation job which reads from Kafka and writes some
Sinks.

When I submit my job Flink checkpoint size keeps increasing if I use
unaligned checkpoint settings and it does not emit any window results.
If I use an aligned checkpoint, size is somewhat under control(still big)
but Checkpoint alignment takes a long time.

I would like to implement something similar [1]. I believe
if UnboundedSourceWrapper pause reading future watermark partitions it will
reduce the size of the checkpoint and I can use unaligned checkpointing.
What do you think about this approach ? Do you have another solution ?

One more question: I was reading code to implement the above idea. I saw
this code [2] Does Flink Runner have a similar implementation?

Thanks

[1] https://github.com/apache/flink/pull/11968
[2]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L207


Local Combiner for GroupByKey on Flink Streaming jobs

2023-05-19 Thread Talat Uyarer via user
Hi,

I have a stream aggregation job which is running on Flink 1.13 I generate
DAG by using Beam SQL. My SQL query has a TUMBLE window. Basically My
pipeline reads from kafka aggregate, counts/sums some values by streamin
aggregation and writes a Sink.

BeamSQl uses Groupbykey for the aggregation part. When I read the
translation code for Group By Key class in Flink Runner [1] I could not see
any local combiner. I see ReducerFunction but I feel it works on the
reducer side. If this is true. How can I implement a local reducer in
Source step to improve shuffling performance or Do I miss something?

If you need more information about my pipeline I share some below.

Thanks
[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L905


This is my SQL query : "SELECT log_source_id, SUM(size) AS total_size FROM
PCOLLECTION  GROUP BY log_source_id, TUMBLE(log_time, INTERVAL '1' MINUTE)"
When I submit the job Flink generates two fused steps Source -> Sink Step.
I shared the Task Name below.
First Step Source step:
Source:
Kafka_IO/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
->
Flat Map ->
ParMultiDo(AvroBytesToRowConverter) ->
BeamCalcRel_47/ParDo(Calc)/ParMultiDo(Calc) ->
BeamAggregationRel_48/assignEventTimestamp/AddTimestamps/ParMultiDo(AddTimestamps)
->
BeamAggregationRel_48/Window.Into()/Window.Assign.out ->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/selectKeys/AddKeys/Map/ParMultiDo(Anonymous)
->
ToBinaryKeyedWorkItem

Second Step is Aggregation and Sink Step:

BeamAggregationRel_48/Group.CombineFieldsByFields/ToKvs/GroupByKey ->
ToGBKResult ->
BeamAggregationRel_48/Group.CombineFieldsByFields/Combine/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/Group.CombineFieldsByFields/ToRow/ParMultiDo(Anonymous)
->
BeamAggregationRel_48/mergeRecord/ParMultiDo(Anonymous) ->
BeamCalcRel_49/ParDo(Calc)/ParMultiDo(Calc) ->
ParMultiDo(RowToOutputFormat) ->
ParMultiDo(SinkProcessor)


Re: Beam SQL Alias issue while using With Clause

2023-03-02 Thread Talat Uyarer via user
Hi Andrew,

Thank you so much for your help. Sorry to hear you changed team :(  I can
handle calcite upgrades if there is a fix. I was working on calcite upgrade
but then we started having so many issues. That's why I stopped doing it.

Talat

On Thu, Mar 2, 2023 at 11:56 AM Andrew Pilloud  wrote:

> Hi Talat,
>
> I managed to turn your test case into something against Calcite. It
> looks like there is a bug affecting tables that contain one or more
> single element structs and no multi element structs. I've sent the
> details to the Calcite mailing list here.
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread_tlr9hsmx09by79h91nwp2d4nv8jfwsto=DwIFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=zJaLiteP9qPsCpsYH_nZTe5CX525Dz56whg44LRafjvy3wE_-_eJrOOM9OtOuoVr=g36wnBGvi7DQG7gvljaG08vXIhROyCoz5vWBBRS43Ag=
>
> I'm experimenting with ideas on how to work around this but a fix will
> likely require a Calcite upgrade, which is not something I'd have time
> to help with. (I'm not on the Google Beam team anymore.)
>
> Andrew
>
> On Wed, Feb 22, 2023 at 12:18 PM Talat Uyarer
>  wrote:
> >
> > Hi @Andrew Pilloud
> >
> > Sorry for the late response. Yes your test is working fine. I changed
> the test input structure like our input structure. Now this test also has
> the same exception.
> >
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQL:
> > WITH `tempTable` AS (SELECT `panwRowTestTable`.`user_info`,
> `panwRowTestTable`.`id`, `panwRowTestTable`.`value`
> > FROM `beam`.`panwRowTestTable` AS `panwRowTestTable`
> > WHERE `panwRowTestTable`.`user_info`.`name` = 'innerStr') (SELECT
> `tempTable`.`user_info`, `tempTable`.`id`, `tempTable`.`value`
> > FROM `tempTable` AS `tempTable`)
> > Feb 21, 2023 2:02:28 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> > INFO: SQLPlan>
> > LogicalProject(user_info=[ROW($0)], id=[$1], value=[$2])
> >   LogicalFilter(condition=[=($0.name, 'innerStr')])
> > LogicalProject(name=[$0.name], id=[$1], value=[$2])
> >   BeamIOSourceRel(table=[[beam, panwRowTestTable]])
> >
> >
> > fieldList must not be null, type = VARCHAR
> > java.lang.AssertionError: fieldList must not be null, type = VARCHAR
> >
> > I dont know what is different from yours. I am sharing my version of the
> test also.
> >
> >
> > Index:
> sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > IDEA additional info:
> > Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> > <+>UTF-8
> > ===
> > diff --git
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> > ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (revision fd383fae1adc545b6b6a22b274902cda956fec49)
> > +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> (date 1677017032324)
> > @@ -54,6 +54,9 @@
> >private static final Schema innerRowSchema =
> >
> Schema.builder().addStringField("string_field").addInt64Field("long_field").build();
> >
> > +  private static final Schema innerPanwRowSchema =
> > +  Schema.builder().addStringField("name").build();
> > +
> >private static final Schema innerRowWithArraySchema =
> >Schema.builder()
> >.addStringField("string_field")
> > @@ -127,8 +130,12 @@
> >.build()))
> >.put(
> >"basicRowTestTable",
> > -  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> > -
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> > +  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> > +
> .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
> > +.put(
> > +  "panwRowTestTable",
> > +
> TestBoundedTable.of(FieldType.row(innerPanwRowSchema), "user_info",
> FieldType.INT64, "id", FieldType.STRING, "value")
> > +
> .addRow

Re: Beam SQL Alias issue while using With Clause

2023-02-22 Thread Talat Uyarer via user
yGHmNWVck609E6yE39dGy4=>
> I don't believe the "LogicalFilter(condition=[=($2.name
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__2.name_=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=UWDQHDRdiJwz-ZXUqZ0qKk8A1eWzn___DizIz3SrGsytSCchME_LiSTVkYPfJrJl=HXjWGUVCwf7PDeI66uosG1UIBEuuKBNXFyfog_EO5Wo=>,
> 'User1')])" particularly "$2.name
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__2.name=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=UWDQHDRdiJwz-ZXUqZ0qKk8A1eWzn___DizIz3SrGsytSCchME_LiSTVkYPfJrJl=69-qFiR4WOLtxc--Vhu-Y0ZJrzMOj9YTrSRIeg-BoSQ=>"
> is something that works, in my test it seems that the planner has flattened
> the complex input and reproduced a ROW at the output.
>
> INFO: SQLPlan>
> LogicalProject(col=[ROW($0, $1)], field=[$2])
>   LogicalFilter(condition=[=($0, 'innerStr')])
> LogicalProject(string_field=[$0.string_field],
> long_field=[$0.long_field], field=[$1])
>   BeamIOSourceRel(table=[[beam, basicRowTestTable]])
>
> Feb 10, 2023 6:07:35 PM
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
> INFO: BEAMPlan>
> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t0.string_field],
> expr#3=[$t0.long_field], expr#4=[ROW($t2, $t3)],
> expr#5=['innerStr':VARCHAR], expr#6=[=($t2, $t5)], col=[$t4], field=[$t1],
> $condition=[$t6])
>   BeamIOSourceRel(table=[[beam, basicRowTestTable]])
>
> ---
> a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> +++
> b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java
> @@ -127,8 +127,8 @@ public class BeamComplexTypeTest {
>.build()))
>.put(
>"basicRowTestTable",
> -  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col")
> -
>  .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
> +  TestBoundedTable.of(FieldType.row(innerRowSchema),
> "col", FieldType.INT64, "field")
> +
>  .addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build(),
> 1L))
>.put(
>"rowWithArrayTestTable",
>TestBoundedTable.of(FieldType.row(rowWithArraySchema),
> "col")
> @@ -220,6 +220,21 @@ public class BeamComplexTypeTest {
>  pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
>}
>
> +  @Test
> +  public void testBasicRowWhereField() {
> +BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
> +PCollection stream =
> +BeamSqlRelUtils.toPCollection(
> +pipeline, sqlEnv.parseQuery("WITH tempTable AS (SELECT * FROM
> basicRowTestTable WHERE basicRowTestTable.col.string_field = 'innerStr')
> SELECT * FROM tempTable"));
> +Schema outputSchema = Schema.builder().addRowField("col",
> innerRowSchema).addInt64Field("field").build();
> +PAssert.that(stream)
> +.containsInAnyOrder(
> +Row.withSchema(outputSchema)
> +
>  .addValues(Row.withSchema(innerRowSchema).addValues("innerStr",
> 1L).build(), 1L)
> +.build());
> +pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
> +  }
> +
>@Test
>public void testArrayConstructor() {
>  BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
>
>
> On Fri, Feb 3, 2023 at 2:06 PM Talat Uyarer 
> wrote:
>
>> Hi Andrew,
>>
>> Thank you for your MR. I am parricated to help us to solve the issue. I
>> rerun our tests and they are partially passing now with your fix.  However,
>> there is one more issue with the WITH clause.
>>
>> When i run following query somehow beam lost type of column
>>
>> WITH tempTable AS (SELECT * FROM PCOLLECTION WHERE
>> PCOLLECTION.`user_info`.`name` = 'User1') SELECT * FROM tempTable
>>
>> I havent test on Beam Master. I run with your latest patch on our code
>> base. This is the output
>>
>> 14:00:30.095 [Test worker] INFO
>>  o.a.b.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
>> WITH `tempTable` AS (SELECT `PCOLLECTION`.`id`, `PCOLLECTION`.`value`,
>> `PCOLLECTION`.`user_info`
>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
>> WHERE `PCOLLECTION`.`user_info`.`name` = 'User1') (SELECT
>> `tempTable`.`id`, `tempTable`.`value`, `tempTable`.`user_info`
>> FROM `tempTable` AS `tempTable`)
>> 14:00:30.106 [Test worker] 

Re: Beam SQL Alias issue while using With Clause

2023-02-03 Thread Talat Uyarer via user
doesn't appear there are any calcite gradle rules to run
>>> CoreQuidemTest and constructing the classpath manually was tedious. Did I
>>> miss something?
>>>
>>> I'm still working on this but I'm out today and Monday, it will probably
>>> be Wednesday before I make any more progress.
>>>
>>> Andrew
>>>
>>> On Fri, Jan 27, 2023 at 10:40 AM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> Yes This aligned also with my debugging. In My Kenn's reply you can see
>>>> a sql test which I wrote in Calcite. Somehow Calcite does not have this
>>>> issue with the 1.28 version.
>>>>
>>>> !use post
>>>> !set outputformat mysql
>>>>
>>>> #Test aliases with with clause
>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>>> "hr"."emps"."name" as v from "hr"."emps")
>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>>>> tempTable.v <> '11' ;
>>>> +-+---+
>>>> | ID  | value |
>>>> +-+---+
>>>> | 100 | Bill  |
>>>> | 110 | Theodore  |
>>>> | 150 | Sebastian |
>>>> | 200 | Eric  |
>>>> +-+---+
>>>> (4 rows)
>>>>
>>>> !ok
>>>>
>>>>
>>>> On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> Yes, that worked.
>>>>>
>>>>> The issue does not occur if I disable all of the following planner
>>>>> rules: CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
>>>>> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
>>>>> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>>>>>
>>>>> All the rules share a common call to RexProgramBuilder.mergePrograms,
>>>>> so I suspect the problem lies there. I spent some time looking but wasn't
>>>>> able to find it by code inspection, it looks like this code path is doing
>>>>> the right thing with names. I'll spend some time tomorrow trying to
>>>>> reproduce this on pure Calcite.
>>>>>
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> Thanks for writing a test for this use case. Without Where clause it
>>>>>> works as expected on our test cases also too. Please add where clause on
>>>>>> second select. With the below query it does not return column names. I
>>>>>> tested on my local also.
>>>>>>
>>>>>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>>>>>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>>>>>> id > 1
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>>>>>> wrote:
>>>>>>
>>>>>>> +d...@beam.apache.org 
>>>>>>>
>>>>>>> I tried reproducing this but was not successful, the output schema
>>>>>>> was as expected. I added the following to 
>>>>>>> BeamSqlMultipleSchemasTest.java
>>>>>>> at head. (I did discover
>>>>>>> that  PAssert.that(result).containsInAnyOrder(output) doesn't validate
>>>>>>> column names however.)
>>>>>>>
>>>>>>>   @Test
>>>>>>>   public void testSelectAs() {
>>>>>>> PCollection input = pipeline.apply(create(row(1,
>>>>>>> "strstr")));
>>>>>>>
>>>>>>> PCollection result =
>>>>>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
>>>>>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS 
>>>>>>> fout_int,
>>>>>>> v AS fout_string FROM tempTable"));
>>>>>>>
>>>>>>> Schema output_schema =
>

Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread Talat Uyarer via user
Hi,

Do you use Flink operator or manually deployed session cluster ?

Thanks

On Fri, Feb 3, 2023, 4:32 AM P Singh  wrote:

> Hi Team,
>
> I have set up a flink cluster on GKE and am trying to submit a beam
> pipeline with below options. I was able to run this on a local machine but
> I don't understand what would be the environment_config? What should I do?
> what to put here instead of localhost:5
>
> Please help.
> options = PipelineOptions([
> "--runner=FlinkRunner",
> "--flink_version=1.14",
> "--flink_master=localhost:8081",
> "--environment_type=EXTERNAL", #EXTERNAL
> "--environment_config=localhost:5",
> ])
>


Beam Portable Framework Question for Same SDK and Runner Language

2023-02-02 Thread Talat Uyarer via user
Hi,

I know we use the portability framework when the sdk language (python) is
different from the runner's language(java) .

If my runner is Java based and I want to use the portability framework for
Java SDK. Is there any optimization on the Beam side rather than running
two separate docker images (one for SDK, one for Runner) and communicating
via GRPC.

Thanks


Re: Beam SQL Alias issue while using With Clause

2023-01-27 Thread Talat Uyarer via user
Hi Andrew,

Yes This aligned also with my debugging. In My Kenn's reply you can see a
sql test which I wrote in Calcite. Somehow Calcite does not have this issue
with the 1.28 version.

!use post
!set outputformat mysql

#Test aliases with with clause
WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
"hr"."emps"."name" as v from "hr"."emps")
SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE
tempTable.v <> '11' ;
+-+---+
| ID  | value |
+-+---+
| 100 | Bill  |
| 110 | Theodore  |
| 150 | Sebastian |
| 200 | Eric  |
+-+---+
(4 rows)

!ok


On Wed, Jan 25, 2023 at 6:08 PM Andrew Pilloud  wrote:

> Yes, that worked.
>
> The issue does not occur if I disable all of the following planner rules:
> CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE,
> LogicalCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE),
> and BeamCalcMergeRule.INSTANCE (which wraps CoreRules.CALC_MERGE).
>
> All the rules share a common call to RexProgramBuilder.mergePrograms, so I
> suspect the problem lies there. I spent some time looking but wasn't able
> to find it by code inspection, it looks like this code path is doing the
> right thing with names. I'll spend some time tomorrow trying to reproduce
> this on pure Calcite.
>
> Andrew
>
>
> On Tue, Jan 24, 2023 at 8:24 PM Talat Uyarer 
> wrote:
>
>> Hi Andrew,
>>
>> Thanks for writing a test for this use case. Without Where clause it
>> works as expected on our test cases also too. Please add where clause on
>> second select. With the below query it does not return column names. I
>> tested on my local also.
>>
>> WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
>> PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
>> id > 1
>>
>> Thanks
>>
>> On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud 
>> wrote:
>>
>>> +d...@beam.apache.org 
>>>
>>> I tried reproducing this but was not successful, the output schema was
>>> as expected. I added the following to BeamSqlMultipleSchemasTest.java at
>>> head. (I did discover that  PAssert.that(result).containsInAnyOrder(output)
>>> doesn't validate column names however.)
>>>
>>>   @Test
>>>   public void testSelectAs() {
>>> PCollection input = pipeline.apply(create(row(1, "strstr")));
>>>
>>> PCollection result =
>>> input.apply(SqlTransform.query("WITH tempTable (id, v) AS
>>> (SELECT f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int,
>>> v AS fout_string FROM tempTable"));
>>>
>>> Schema output_schema =
>>>
>>> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
>>> assertThat(result.getSchema(), equalTo(output_schema));
>>>
>>> Row output = Row.withSchema(output_schema).addValues(1,
>>> "strstr").build();
>>> PAssert.that(result).containsInAnyOrder(output);
>>> pipeline.run();
>>>   }
>>>
>>> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi Kenn,
>>>>
>>>> Thank you for replying back to my email.
>>>>
>>>> I was under the same impression about Calcite. But I wrote a test on
>>>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>>>
>>>> Here is my test case. If you want you can also run on Calcite. Please
>>>> put under core/src/test/resources/sql as text file. and Run CoreQuidemTest
>>>> class.
>>>>
>>>> !use post
>>>> !set outputformat mysql
>>>>
>>>> #Test aliases with with clause
>>>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>>>> "hr"."emps"."name" as v from "hr"."emps")
>>>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>>>> tempTable.v <> '11' ;
>>>> +-+---+
>>>> | ID  | value |
>>>> +-+---+
>>>> | 100 | Bill  |
>>>> | 110 | Theodore  |
>>>> | 150 | Sebastian |
>>>> | 200 | Eric  |
>>>> +-+---+
>>>> (4 rows)
>>>>
>>>> !ok
>>>>
>>>

Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Talat Uyarer via user
Hi Andrew,

Thanks for writing a test for this use case. Without Where clause it works
as expected on our test cases also too. Please add where clause on second
select. With the below query it does not return column names. I tested on
my local also.

WITH tempTable (id, v) AS (SELECT f_int as id, f_string as v FROM
PCOLLECTION) SELECT id AS fout_int, v AS fout_string FROM tempTable WHERE
id > 1

Thanks

On Tue, Jan 24, 2023 at 5:28 PM Andrew Pilloud  wrote:

> +d...@beam.apache.org 
>
> I tried reproducing this but was not successful, the output schema was as
> expected. I added the following to BeamSqlMultipleSchemasTest.java at head.
> (I did discover that  PAssert.that(result).containsInAnyOrder(output)
> doesn't validate column names however.)
>
>   @Test
>   public void testSelectAs() {
> PCollection input = pipeline.apply(create(row(1, "strstr")));
>
> PCollection result =
> input.apply(SqlTransform.query("WITH tempTable (id, v) AS (SELECT
> f_int as id, f_string as v FROM PCOLLECTION) SELECT id AS fout_int, v AS
> fout_string FROM tempTable"));
>
> Schema output_schema =
>
> Schema.builder().addInt32Field("fout_int").addStringField("fout_string").build();
> assertThat(result.getSchema(), equalTo(output_schema));
>
> Row output = Row.withSchema(output_schema).addValues(1,
> "strstr").build();
> PAssert.that(result).containsInAnyOrder(output);
> pipeline.run();
>   }
>
> On Tue, Jan 24, 2023 at 8:13 AM Talat Uyarer 
> wrote:
>
>> Hi Kenn,
>>
>> Thank you for replying back to my email.
>>
>> I was under the same impression about Calcite. But I wrote a test on
>> Calcite 1.28 too. It is working without issue that I see on BEAM
>>
>> Here is my test case. If you want you can also run on Calcite. Please put
>> under core/src/test/resources/sql as text file. and Run CoreQuidemTest
>> class.
>>
>> !use post
>> !set outputformat mysql
>>
>> #Test aliases with with clause
>> WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id, 
>> "hr"."emps"."name" as v from "hr"."emps")
>> SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE 
>> tempTable.v <> '11' ;
>> +-+---+
>> | ID  | value |
>> +-+---+
>> | 100 | Bill  |
>> | 110 | Theodore  |
>> | 150 | Sebastian |
>> | 200 | Eric  |
>> +-+---+
>> (4 rows)
>>
>> !ok
>>
>>
>> On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:
>>
>>> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
>>> not see any obvious cause for this:
>>> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_b3aa2e89489898f8c760294ba4dba2310ac53e70_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rule_BeamCalcRule.java-23L69=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KXc2qSceL6qFbFnQ_2qUOHr9mKuc6zYY8rJTNZC8p_wTcNs4M6mHQoCuoc4JfeaA=KjzplEf29oFB6uivvdjixpQiArWtfV-1SXpALL-ugEM=>
>>>
>>> I don't like to guess that upstream libraries have the bug, but in this
>>> case I wonder if the alias is lost in the Calcite optimizer rule for
>>> merging the projects and filters into a Calc.
>>>
>>> Kenn
>>>
>>> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles 
>>> wrote:
>>>
>>>> I am not sure I understand the question, but I do see an issue.
>>>>
>>>> Context: "CalcRel" is an optimized relational operation that is
>>>> somewhat like ParDo, with a small snippet of a single-assignment DSL
>>>> embedded in it. Calcite will choose to merge all the projects and filters
>>>> into the node, and then generates Java bytecode to directly execute the 
>>>> DSL.
>>>>
>>>> Problem: it looks like the CalcRel has output columns with aliases "id"
>>>> and "v" where it should have output columns with aliases "id" and "value".
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:
>>>>
>>>>> Adding: @Andrew Pilloud  @Kenneth Knowles
>>>>> 
>>>>>
>

Re: Beam SQL Alias issue while using With Clause

2023-01-24 Thread Talat Uyarer via user
Hi Kenn,

Thank you for replying back to my email.

I was under the same impression about Calcite. But I wrote a test on
Calcite 1.28 too. It is working without issue that I see on BEAM

Here is my test case. If you want you can also run on Calcite. Please put
under core/src/test/resources/sql as text file. and Run CoreQuidemTest
class.

!use post
!set outputformat mysql

#Test aliases with with clause
WITH tempTable(id, v) AS (select "hr"."emps"."empid" as id,
"hr"."emps"."name" as v from "hr"."emps")
SELECT tempTable.id as id, tempTable.v as "value" FROM tempTable WHERE
tempTable.v <> '11' ;
+-+---+
| ID  | value |
+-+---+
| 100 | Bill  |
| 110 | Theodore  |
| 150 | Sebastian |
| 200 | Eric  |
+-+---+
(4 rows)

!ok


On Mon, Jan 23, 2023 at 10:16 AM Kenneth Knowles  wrote:

> Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
> not see any obvious cause for this:
> https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_b3aa2e89489898f8c760294ba4dba2310ac53e70_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rule_BeamCalcRule.java-23L69=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KXc2qSceL6qFbFnQ_2qUOHr9mKuc6zYY8rJTNZC8p_wTcNs4M6mHQoCuoc4JfeaA=KjzplEf29oFB6uivvdjixpQiArWtfV-1SXpALL-ugEM=>
>
> I don't like to guess that upstream libraries have the bug, but in this
> case I wonder if the alias is lost in the Calcite optimizer rule for
> merging the projects and filters into a Calc.
>
> Kenn
>
> On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles  wrote:
>
>> I am not sure I understand the question, but I do see an issue.
>>
>> Context: "CalcRel" is an optimized relational operation that is somewhat
>> like ParDo, with a small snippet of a single-assignment DSL embedded in it.
>> Calcite will choose to merge all the projects and filters into the node,
>> and then generates Java bytecode to directly execute the DSL.
>>
>> Problem: it looks like the CalcRel has output columns with aliases "id"
>> and "v" where it should have output columns with aliases "id" and "value".
>>
>> Kenn
>>
>> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:
>>
>>> Adding: @Andrew Pilloud  @Kenneth Knowles
>>> 
>>>
>>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am using Beam 2.43 with Calcite SQL with Java.
>>>>
>>>> I have a query with a WITH clause and some aliasing. Looks like Beam
>>>> Query optimizer after optimizing my query, it drops Select statement's
>>>> aliases. Can you help me to identify where the problem is ?
>>>>
>>>> This is my query
>>>> INFO: SQL:
>>>> WITH `tempTable` (`id`, `v`) AS (SELECT
>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id` AS
>>>> `id`, `tempTable`.`v` AS `value`
>>>> FROM `tempTable` AS `tempTable`
>>>> WHERE `tempTable`.`v` <> '11')
>>>>
>>>> This is Calcite Plan look at LogicalProject(id=[$0], value=[$1]) in SQL
>>>> plan.
>>>>
>>>> Jan 12, 2023 12:19:08 PM
>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner 
>>>> convertToBeamRel
>>>> INFO: SQLPlan>
>>>> LogicalProject(id=[$0], value=[$1])
>>>>   LogicalFilter(condition=[<>($1, '11')])
>>>> LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
>>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>>
>>>> But Beam Plan does not have a LogicalProject(id=[$0], value=[$1]) or
>>>> similar.
>>>>
>>>> Jan 12, 2023 12:19:08 PM
>>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner 
>>>> convertToBeamRel
>>>> INFO: BEAMPlan>
>>>> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt],
>>>> expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], expr#5=[<>($t3, $t4)],
>>>> id=[$t2], v=[$t3], $condition=[$t5])
>>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>>
>>>>
>>>> Thanks
>>>>
>>>


Beam SQL Alias issue while using With Clause

2023-01-12 Thread Talat Uyarer via user
Hi All,

I am using Beam 2.43 with Calcite SQL with Java.

I have a query with a WITH clause and some aliasing. Looks like Beam Query
optimizer after optimizing my query, it drops Select statement's aliases.
Can you help me to identify where the problem is ?

This is my query
INFO: SQL:
WITH `tempTable` (`id`, `v`) AS (SELECT
`PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
`PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id` AS
`id`, `tempTable`.`v` AS `value`
FROM `tempTable` AS `tempTable`
WHERE `tempTable`.`v` <> '11')

This is Calcite Plan look at LogicalProject(id=[$0], value=[$1]) in SQL
plan.

Jan 12, 2023 12:19:08 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(id=[$0], value=[$1])
  LogicalFilter(condition=[<>($1, '11')])
LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

But Beam Plan does not have a LogicalProject(id=[$0], value=[$1]) or
similar.

Jan 12, 2023 12:19:08 PM
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt],
expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], expr#5=[<>($t3, $t4)],
id=[$t2], v=[$t3], $condition=[$t5])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])


Thanks


Re: Beam slowness compared to flink-native

2022-05-10 Thread Talat Uyarer
HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: https://issues.apache.org/jira/browse/BEAM-11146

Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) <
ifat.a...@nokia.com> wrote:

> Hi,
>
>
>
> I’m investigating a slowness of our beam pipelines, and as part of that I
> tried to compare a very simple beam pipeline with an equivalent
> flink-native pipeline. Both pipelines should read strings from one kafka
> topic and write them to another topic. I’m using beam 2.38.0 and flink
> 1.13.
>
> I tried running each pipeline separately, on a single task manager with a
> single slot and parallelism 1. What I saw is that Flink native runs 5 times
> faster than beam (150,000 strings per second in flink comparing to 30,000
> in beam).
>
>
>
> I’ll be happy if you can help me figure out why there is such a
> difference. Maybe the pipelines are not really equivalent, or the beam
> configuration is wrong?
>
>
>
>
>
> Flink native pipeline:
>
>
>
> public void process() throws Exception {
>
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
> Properties properties = new Properties();
>
> properties.setProperty("bootstrap.servers", kafkaAddress);
>
> properties.setProperty("group.id", KAFKA_GROUP_ID);
>
> FlinkKafkaConsumer consumer = new
> FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties);
>
> consumer.setStartFromEarliest();
>
>
> consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
>
>
>
> FlinkKafkaProducer producer = new
> FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema());
>
>
>
> DataStream inputMessagesStream =
> environment.addSource(consumer);
>
> inputMessagesStream.addSink(producer);
>
>
>
> environment.execute();
>
> }
>
>
>
>
>
> Beam pipeline:
>
>
>
> public static void main(String[] args) {
>
> try {
>
> StreamingOptions options =
> PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
>
> options.setStreaming(true);
>
> options.setRunner(FlinkRunner.class);
>
> Pipeline pipeline = Pipeline.create(options);
>
>
>
> PTransform>> transform
> = KafkaIO.read()
>
> .withBootstrapServers(bootstrapServers)
>
> .withTopic(inputTopic)
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializer(StringDeserializer.class)
>
> .withConsumerConfigUpdates((ImmutableMap.of(
>
> "auto.offset.reset", "earliest",
>
> "group.id", consumerGroup)))
>
> .withoutMetadata();
>
>
>
> PCollection> input =
> pipeline.apply("readFromKafka", transform);
>
>
>
> PCollection> convertedInput =
>
> input.apply("ConvertToStringRecord",
>
> ParDo.of(new
> ConvertToStringRecord(outputTopic) {}))
>
> .setCoder(new
> ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of()));
>
>
>
> KafkaIO.WriteRecords writeToAvro =
> KafkaIO.writeRecords()
>
> .withBootstrapServers(bootstrapServers)
>
> .withTopic(outputTopic)
>
> .withKeySerializer(StringSerializer.class)
>
> .withValueSerializer(StringSerializer.class);
>
>
>
> convertedInput.apply("writeToKafka", writeToAvro);
>
> pipeline.run();
>
>
>
> } catch (Exception e) {
>
> log.atError().withThrowable(e).log("Exception thrown while
> running pipeline PipelineStringToString");
>
> }
>
> }
>
>
>
> @Log4j2
>
> @AllArgsConstructor
>
> public class ConvertToStringRecord extends DoFn,
> ProducerRecord> {
>
> private String topic;
>
>
>
> private static ProducerRecord getRecord(KV String> message, String topic) {
>
> String string = message.getValue();
>
> ProducerRecord pr = new ProducerRecord<>(topic,
> message.getKey(), string) {};
>
> pr.headers().add("__TypeId__",
> String.class.getName().getBytes(StandardCharsets.UTF_8));
>
> return pr;
>
> }
>
>
>
> @ProcessElement
>
> public void processElement(ProcessContext c) {
>
> try {
>
> ProducerRecord pr =
> getRecord(Objects.requireNonNull(c.element()), topic);
>
> c.output(pr);
>
> } catch (Exception e) {
>
> log.atError().withThrowable(e).log("exception thrown while
> processing string");
>
> }
>
> }
>
> }
>
>
>
>
>
> Thanks,
>
> Ifat
>
>
>


Re: Calculating Top K element per window with Beam SQL

2021-07-29 Thread Talat Uyarer
Hi Andrew,

Sorry for late reply. Yes I removed that statement now it is working.

Thank you so much for suggestion

On Fri, Jul 16, 2021 at 12:26 PM Andrew Pilloud  wrote:

> Hi Talat,
>
> The syntax you've described actually does turn into the Top transform in
> Beam SQL
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_243128a8fc52798e1b58b0cf1a271d95ee7aa241_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamSortRel.java-23L215=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=t64ngtbWsSfQC4rND6Nc4ri8I--Ey30-KIuKISgxxI0=MvKjlgH8mheiiA9NYAp3JBkJ6-Hrb_rMe9KMwLWwuKE=>.
> However the code currently limits it to the global window. I assume your
> sample query fails with an error like "`ORDER BY` is only supported for
> global window...". I don't think there is a way to do this today, but I
> also wouldn't assume this syntax is necessarily wrong. If you remove the
> window check does it work? (Delete this if block:
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java#L200
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_243128a8fc52798e1b58b0cf1a271d95ee7aa241_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamSortRel.java-23L200=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=t64ngtbWsSfQC4rND6Nc4ri8I--Ey30-KIuKISgxxI0=f5C-HFf6aqOpMxRyaDjkMGNb7PsX-puF6K4cW-b_gTI=>
> )
>
> Andrew
>
>
> On Fri, Jul 9, 2021 at 12:57 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I am trying to calculate the top k element based on a streaming
>> aggregation per window. Do you know if there is any support for it
>> on BeamSQL or How can achieve this goal with BeamSQL on stream ?
>>
>> Sample Query
>> SELECT customer_id, app, sum(bytes_total) as bytes_total FROM
>> PCOLLECTION  GROUP BY customer_id, app, TUMBLE(log_time, INTERVAL '1'
>> MINUTE) ORDER BY bytes_total DESC LIMIT 10
>>
>> Thanks
>>
>>


Calculating Top K element per window with Beam SQL

2021-07-09 Thread Talat Uyarer
Hi,

I am trying to calculate the top k element based on a streaming
aggregation per window. Do you know if there is any support for it
on BeamSQL or How can achieve this goal with BeamSQL on stream ?

Sample Query
SELECT customer_id, app, sum(bytes_total) as bytes_total FROM PCOLLECTION
GROUP BY customer_id, app, TUMBLE(log_time, INTERVAL '1' MINUTE) ORDER BY
bytes_total DESC LIMIT 10

Thanks


How Beam SQL Side Input refresh/update

2021-05-07 Thread Talat Uyarer
Hi,

Based on Join documentation. If I have a Join with Unbounded and Bounded

> For this type of JOIN bounded input is treated as a side-input by the
> implementation. This means that window/trigger is inherented from upstreams.


On my pipeline I dont have any triggering or window. I use a global window
on the Unbounded side. Basically I read from kafka  data and I want to join
with static data to enrich the kafka message. Not very frequently I want to
update my static data. I am trying to understand How i can update when I
update my static data.

Thanks


Re: Apache Beam SQL and UDF

2021-02-10 Thread Talat Uyarer
Thanks Rui to remind me lifecycle of UDF. LOoks liek there is no any
lifecycle. I checked the code looks like we create UDF's instance for each
message:

org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.SqlFunctions.isTrue(new
> com.paloaltonetworks.cortex.streamcompute.functions.MyUDFFunction().apply(current.getString(2))


Do you think we should put the UDF instance in the setup function and call
in processlement ? Do you see anything besides this ? Or How can i achieve
my goal in a different way ? I thought I could use the Table Provider
approach. But it does not update data with any mechanisim.

Thanks


On Wed, Feb 10, 2021 at 12:41 PM Talat Uyarer 
wrote:

> Does beam create UDF function for every bundle or in setup of pipeline ?
>
> I will keep internal state in memory. The Async thread will update that in
> memory state based on an interval such as every hour etc. If beam keeps UDF
> instance more than one bundle it is ok for me.
>
>
> On Wed, Feb 10, 2021, 12:37 PM Rui Wang  wrote:
>
>> The problem that I can think of is maybe before the async call is
>> completed, the UDF life cycle has reached to the end.
>>
>>
>> -Rui
>>
>> On Wed, Feb 10, 2021 at 12:34 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> We plan to use UDF on our sql. We want to achieve some kind of
>>> filtering based on internal states. We want to update that internal state
>>> with a separate async thread in UDF. Before implementing that thing I want
>>> to get your options. Is there any limitation for UDF to have multi-thread
>>> implementation ?  Our UDF is a scalar function. It will get 1 or 2 input
>>> and return boolean.
>>>
>>> I will appreciate your comments in advance.
>>>
>>> Thanks
>>>
>>


Re: Apache Beam SQL and UDF

2021-02-10 Thread Talat Uyarer
Does beam create UDF function for every bundle or in setup of pipeline ?

I will keep internal state in memory. The Async thread will update that in
memory state based on an interval such as every hour etc. If beam keeps UDF
instance more than one bundle it is ok for me.


On Wed, Feb 10, 2021, 12:37 PM Rui Wang  wrote:

> The problem that I can think of is maybe before the async call is
> completed, the UDF life cycle has reached to the end.
>
>
> -Rui
>
> On Wed, Feb 10, 2021 at 12:34 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> We plan to use UDF on our sql. We want to achieve some kind of
>> filtering based on internal states. We want to update that internal state
>> with a separate async thread in UDF. Before implementing that thing I want
>> to get your options. Is there any limitation for UDF to have multi-thread
>> implementation ?  Our UDF is a scalar function. It will get 1 or 2 input
>> and return boolean.
>>
>> I will appreciate your comments in advance.
>>
>> Thanks
>>
>


Apache Beam SQL and UDF

2021-02-10 Thread Talat Uyarer
Hi,

We plan to use UDF on our sql. We want to achieve some kind of
filtering based on internal states. We want to update that internal state
with a separate async thread in UDF. Before implementing that thing I want
to get your options. Is there any limitation for UDF to have multi-thread
implementation ?  Our UDF is a scalar function. It will get 1 or 2 input
and return boolean.

I will appreciate your comments in advance.

Thanks


Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Talat Uyarer
to evaluate the WHERE clause, and then convert back to Avro to
>>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>>> messages,but this is an implementation artifact - in theory we could 
>>>>>> output
>>>>>> the original bytes whenever we see a SELECT *. This is not truly a 
>>>>>> dynamic
>>>>>> schema, since you can't really do anything with these extra fields except
>>>>>> forward them to your output.
>>>>>>
>>>>>> I see two possible ways to address this.
>>>>>>
>>>>>> 1. As I mentioned above, in the case of a SELECT * we could output
>>>>>> the original bytes, and only use the Beam Row for evaluating the WHERE
>>>>>> clause. This might be very expensive though - we risk having to keep two
>>>>>> copies of every message around, one in the original Avro format and one 
>>>>>> in
>>>>>> Row format.
>>>>>>
>>>>>> 2. The other way would be to do what protocol buffers do. We could
>>>>>> add one extra field to the inferred Beam schema to store new, unknown
>>>>>> fields (probably this would be a map-valued field). This extra field 
>>>>>> would
>>>>>> simply store the raw bytes of these unknown fields, and then when
>>>>>> converting back to Avro they would be added to the output message. This
>>>>>> might also add some overhead to the pipeline, so might be best to make 
>>>>>> this
>>>>>> behavior opt in.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette 
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>>> support to SchemaCoder, including support for certain schema changes 
>>>>>>> (field
>>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>>
>>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>>> support?
>>>>>>>
>>>>>>> [1]
>>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=Xl4R9N-8xXkH0eYS8Y49EQoBUaQSTRtv7sBjo9XRAOk=9wy_ZugJkaLoCzvqO7OVL4LjVLi0WcdWDCEjXEhcn6M=>
>>>>>>>
>>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>>> the case where existing fields do not change). Whether anyone 
>>>>>>>> currently has
>>>>>>>> available time to do this is a different question, but it's something 
>>>>>>>> that
>>>>>>>> can be looked into.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>>> deletion.
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Talat Uyarer
t;>> store the raw bytes of these unknown fields, and then when converting back
>>>>> to Avro they would be added to the output message. This might also add 
>>>>> some
>>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>> support to SchemaCoder, including support for certain schema changes 
>>>>>> (field
>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>
>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>> support?
>>>>>>
>>>>>> [1]
>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=1djoltLEJQ6jtY86m9IeTdPEQJxYe7z71jr8apNlCa0=eRPi17hG4lAj-GUxi-8IAvcjsWnYeE5pk_hhVVaLdWc=>
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:
>>>>>>
>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>> the case where existing fields do not change). Whether anyone currently 
>>>>>>> has
>>>>>>> available time to do this is a different question, but it's something 
>>>>>>> that
>>>>>>> can be looked into.
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>> deletion.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>>>>>>>>
>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
>>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>>>> from Row to Avro (DoFn)-> Write DB/GCS/GRPC etc
>>>>>>>>>>
>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>> - SELECT * FROM PCOLLECTION 
>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>
>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes 
>>>>>>>>>> to Beam
>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>> 

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Talat Uyarer
Thanks Reuven,

I can work on that. I know the internals of BeamSQL. I could not figure out
How to replace Step's code with new generated code after the pipeline is
submitted. Could you share your thoughts on this?

Thanks

On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:

> Thanks. It might be theoretically possible to do this (at least for the
> case where existing fields do not change). Whether anyone currently has
> available time to do this is a different question, but it's something that
> can be looked into.
>
> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer 
> wrote:
>
>> Adding new fields is more common than modifying existing fields. But type
>> change is also possible for existing fields, such as regular mandatory
>> field(string,integer) to union(nullable field). No field deletion.
>>
>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>>
>>> And when you say schema changes, are these new fields being added to the
>>> schema? Or are you making changes to the existing fields?
>>>
>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>> For sure let me explain a little bit about my pipeline.
>>>> My Pipeline is actually simple
>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>> Row to Avro (DoFn)-> Write DB/GCS/GRPC etc
>>>>
>>>> On our jobs We have three type sqls
>>>> - SELECT * FROM PCOLLECTION
>>>> - SELECT * FROM PCOLLECTION 
>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>> PCOLLECTION
>>>>
>>>> We know writerSchema for each message. While deserializing avro binary
>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>> step. It always produces a reader schema's generic record and we convert
>>>> that generic record to Row.
>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>
>>>> In the current scenario When we have schema changes first we restart
>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>> we read different versions of the schema and we always produce the latest
>>>> schema's record. Without breaking our pipeline we are able to handle
>>>> multiple versions of data in the same streaming pipeline. If we can
>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>> code. That's why I am looking for some solution. We dont need multiple
>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>> schema on the fly.
>>>>
>>>> I hope I can explain it :)
>>>>
>>>> Thanks
>>>>
>>>> [1]
>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc=>
>>>>
>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:
>>>>
>>>>> Can you explain the use case some more? Are you wanting to change your
>>>>> SQL statement as well when the schema changes? If not, what are those new
>>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>>> clearly didn't reference those fields in a SELECT statement since they
>>>>> didn't exist, so what are you missing by not having them unless you are
>>>>> also changing the SQL statement?
>>>>>
>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>> those fields are included?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> I assume SQL query is not going to change. Changing things is the Row
>

Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Talat Uyarer
Adding new fields is more common than modifying existing fields. But type
change is also possible for existing fields, such as regular mandatory
field(string,integer) to union(nullable field). No field deletion.

On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:

> And when you say schema changes, are these new fields being added to the
> schema? Or are you making changes to the existing fields?
>
> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer 
> wrote:
>
>> Hi,
>> For sure let me explain a little bit about my pipeline.
>> My Pipeline is actually simple
>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
>> to Avro (DoFn)-> Write DB/GCS/GRPC etc
>>
>> On our jobs We have three type sqls
>> - SELECT * FROM PCOLLECTION
>> - SELECT * FROM PCOLLECTION 
>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>> PCOLLECTION
>>
>> We know writerSchema for each message. While deserializing avro binary we
>> use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
>> It always produces a reader schema's generic record and we convert that
>> generic record to Row.
>> While submitting DF job we use latest schema to generate beamSchema.
>>
>> In the current scenario When we have schema changes first we restart all
>> 15k jobs with the latest updated schema then whenever we are done we turn
>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
>> read different versions of the schema and we always produce the latest
>> schema's record. Without breaking our pipeline we are able to handle
>> multiple versions of data in the same streaming pipeline. If we can
>> generate SQL's java code when we get notified wirth latest schema we will
>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>> code. That's why I am looking for some solution. We dont need multiple
>> versions of SQL. We only need to regenerate SQL schema with the latest
>> schema on the fly.
>>
>> I hope I can explain it :)
>>
>> Thanks
>>
>> [1]
>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc=>
>>
>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:
>>
>>> Can you explain the use case some more? Are you wanting to change your
>>> SQL statement as well when the schema changes? If not, what are those new
>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>> clearly didn't reference those fields in a SELECT statement since they
>>> didn't exist, so what are you missing by not having them unless you are
>>> also changing the SQL statement?
>>>
>>> Is this a case where you have a SELECT *, and just want to make sure
>>> those fields are included?
>>>
>>> Reuven
>>>
>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi Andrew,
>>>>
>>>> I assume SQL query is not going to change. Changing things is the Row
>>>> schema by adding new columns or rename columns. if we keep a version
>>>> information on somewhere for example a KV pair. Key is schema information,
>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>> it is not still doable ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>>>> recommended practice is to run both the old and new pipeline for some 
>>>>> time,
>>>>> then pick a window boundary to transition the output from the old pipeline
>>>>> to the new one.
>>>>>
>>>>> Beam doesn't handle changing the format of data sent between
>>>>>

Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Talat Uyarer
Hi,
For sure let me explain a little bit about my pipeline.
My Pipeline is actually simple
Read Kafka -> Convert Avro Bytes to Beam Row(DoFn, Row>) ->
Apply Filter(SqlTransform.query(sql)) -> Convert back from Row to Avro
(DoFn)-> Write DB/GCS/GRPC etc

On our jobs We have three type sqls
- SELECT * FROM PCOLLECTION
- SELECT * FROM PCOLLECTION 
- SQL Projection with or without Where clause  SELECT col1, col2 FROM
PCOLLECTION

We know writerSchema for each message. While deserializing avro binary we
use writer schema and reader schema on Convert Avro Bytes to Beam Row step.
It always produces a reader schema's generic record and we convert that
generic record to Row.
While submitting DF job we use latest schema to generate beamSchema.

In the current scenario When we have schema changes first we restart all
15k jobs with the latest updated schema then whenever we are done we turn
on the latest schema for writers. Because of Avro's GrammerResolver[1] we
read different versions of the schema and we always produce the latest
schema's record. Without breaking our pipeline we are able to handle
multiple versions of data in the same streaming pipeline. If we can
generate SQL's java code when we get notified wirth latest schema we will
handle all schema changes. The only remaining obstacle is Beam's SQL Java
code. That's why I am looking for some solution. We dont need multiple
versions of SQL. We only need to regenerate SQL schema with the latest
schema on the fly.

I hope I can explain it :)

Thanks

[1]
https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html

On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:

> Can you explain the use case some more? Are you wanting to change your SQL
> statement as well when the schema changes? If not, what are those new
> fields doing in the pipeline? What I mean is that your old SQL statement
> clearly didn't reference those fields in a SELECT statement since they
> didn't exist, so what are you missing by not having them unless you are
> also changing the SQL statement?
>
> Is this a case where you have a SELECT *, and just want to make sure those
> fields are included?
>
> Reuven
>
> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer 
> wrote:
>
>> Hi Andrew,
>>
>> I assume SQL query is not going to change. Changing things is the Row
>> schema by adding new columns or rename columns. if we keep a version
>> information on somewhere for example a KV pair. Key is schema information,
>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>> pipelines. When we have a schema change we restart a 15k DF job which is
>> pain. I am looking for a possible way to avoid job restart. Dont you think
>> it is not still doable ?
>>
>> Thanks
>>
>>
>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud 
>> wrote:
>>
>>> Unfortunately we don't have a way to generate the SQL Java code on the
>>> fly, even if we did, that wouldn't solve your problem. I believe our
>>> recommended practice is to run both the old and new pipeline for some time,
>>> then pick a window boundary to transition the output from the old pipeline
>>> to the new one.
>>>
>>> Beam doesn't handle changing the format of data sent between
>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>> data between steps of the pipeline. The builtin coders (including the
>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>> schema evolution. They are optimized for performance at all costs.
>>>
>>> If you worked around this, the Beam model doesn't support changing the
>>> structure of the pipeline graph. This would significantly limit the changes
>>> you can make. It would also require some changes to SQL to try to produce
>>> the same plan for an updated SQL query.
>>>
>>> Andrew
>>>
>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>> format. We generate our rows based on our Avro schema. Over time the schema
>>>> is changing. I believe Beam SQL generates Java code based on what we define
>>>> as BeamSchema while submitting the pipeline. Do you have any idea How can
>>>> we handle schema changes with resubmitting our beam job. Is it possible to
>>>> generate SQL java code on the fly ?
>>>>
>>>> Thanks
>>>>
>>>


Re: About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Talat Uyarer
Hi Andrew,

I assume SQL query is not going to change. Changing things is the Row
schema by adding new columns or rename columns. if we keep a version
information on somewhere for example a KV pair. Key is schema information,
value is Row. Can not we generate SQL code ? Why I am asking We have 15k
pipelines. When we have a schema change we restart a 15k DF job which is
pain. I am looking for a possible way to avoid job restart. Dont you think
it is not still doable ?

Thanks


On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud  wrote:

> Unfortunately we don't have a way to generate the SQL Java code on the
> fly, even if we did, that wouldn't solve your problem. I believe our
> recommended practice is to run both the old and new pipeline for some time,
> then pick a window boundary to transition the output from the old pipeline
> to the new one.
>
> Beam doesn't handle changing the format of data sent between intermediate
> steps in a running pipeline. Beam uses "coders" to serialize data between
> steps of the pipeline. The builtin coders (including the Schema Row Coder
> used by SQL) have a fixed data format and don't handle schema evolution.
> They are optimized for performance at all costs.
>
> If you worked around this, the Beam model doesn't support changing the
> structure of the pipeline graph. This would significantly limit the changes
> you can make. It would also require some changes to SQL to try to produce
> the same plan for an updated SQL query.
>
> Andrew
>
> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> We are using Beamsql on our pipeline. Our Data is written in Avro format.
>> We generate our rows based on our Avro schema. Over time the schema is
>> changing. I believe Beam SQL generates Java code based on what we define as
>> BeamSchema while submitting the pipeline. Do you have any idea How can we
>> handle schema changes with resubmitting our beam job. Is it possible to
>> generate SQL java code on the fly ?
>>
>> Thanks
>>
>


About Beam SQL Schema Changes and Code generation

2020-12-07 Thread Talat Uyarer
Hi,

We are using Beamsql on our pipeline. Our Data is written in Avro format.
We generate our rows based on our Avro schema. Over time the schema is
changing. I believe Beam SQL generates Java code based on what we define as
BeamSchema while submitting the pipeline. Do you have any idea How can we
handle schema changes with resubmitting our beam job. Is it possible to
generate SQL java code on the fly ?

Thanks


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-03 Thread Talat Uyarer
Hi,

One more update. Sorry When I created a code sample that I shared. I put
StringBuilder under the setup function but actually it was on the start
bundle function. So far I tested below scenarios
- with StringWriter construct object every processElement call
- with StringBuilder construct object every processElement call
- with StringBuilder construct object every startBundle call (and also
tried setLength(0) and delete(0,sb.length() to clean StringBuilder)

None of the cases prevent DF jobs from getting below error.

> Shutting down JVM after 8 consecutive periods of measured GC thrashing.
> Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
> #pushbacks=3, gc thrashing=true. Heap dump not written.


And also my process rate is 4kps per instance. I would like to hear your
suggestions if you have any.

Thanks

On Wed, Sep 2, 2020 at 6:22 PM Talat Uyarer 
wrote:

> I also tried Brian's suggestion to clear stringbuilder by calling delete
> with stringbuffer length. No luck. I am still getting the same error
> message. Do you have any suggestions ?
>
> Thanks
>
> On Wed, Sep 2, 2020 at 3:33 PM Talat Uyarer 
> wrote:
>
>> If I'm understanding Talat's logic correctly, it's not necessary to reuse
>>> the string builder at all in this case.
>>
>> Yes. I tried it too. But DF job has the same issue.
>>
>>
>> On Wed, Sep 2, 2020 at 3:17 PM Kyle Weaver  wrote:
>>
>>> > It looks like `writer.setLength(0)` may actually allocate a new
>>> buffer, and then the buffer may also need to be resized as the String
>>> grows, so you could be creating a lot of orphaned buffers very quickly. I'm
>>> not that familiar with StringBuilder, is there a way to reset it and re-use
>>> the existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>
>>> If I'm understanding Talat's logic correctly, it's not necessary to
>>> reuse the string builder at all in this case.
>>>
>>> On Wed, Sep 2, 2020 at 3:11 PM Brian Hulette 
>>> wrote:
>>>
>>>> That error isn't exactly an OOM, it indicates the JVM is spending a
>>>> significant amount of time in garbage collection.
>>>>
>>>> It looks like `writer.setLength(0)` may actually allocate a new buffer,
>>>> and then the buffer may also need to be resized as the String grows, so you
>>>> could be creating a lot of orphaned buffers very quickly. I'm not that
>>>> familiar with StringBuilder, is there a way to reset it and re-use the
>>>> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>>
>>>> [1]
>>>> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=8xskmxTZ2EbxwBknWfeIiV2kEsXsu9dzjWT_yG6A0s4=ZL6S353ZUzPRmxrPo8Sei_mdxsWDxs4Km2RwwiwefEU=>
>>>>
>>>> On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Sorry for the wrong import. You can see on the code I am using
>>>>> StringBuilder.
>>>>>
>>>>> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>>>>>
>>>>>> Here is a question answered on StackOverflow:
>>>>>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ=>
>>>>>>
>>>>>> Could you try using StringBuilder instead since the usage is not
>>>>>> appropriate for a StringWriter?
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have an issue with String Concatenating. You can see my code
>>>>>>> below.[1] I have a step on my df job which is concatenating strings. But
>>>>>>> somehow when I use that step my job starts getting jvm restart errors.
>>>>>>>
>>>>>>>  Shutting down JVM after 8 consecutive periods of measured GC
>>>>>>>> thrashing. Memory is used/total/max = 4112/5994/5994 MB, GC last/max =
>>>>>>>> 97.36/97.36 %, #pushbacks=3, gc thrashing=true. Heap dump not written.
>>>>>>>
>>>>>>>
>>>>>>> And also I try to use Avro rather than String. When I use Avro, it
>>>>>>> works fine without any issue. Do you have any suggestions?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> [1] https://dpaste.com/7RTV86WQC
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY=>
>>>>>>>
>>>>>>>
>>>>>>>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Talat Uyarer
I also tried Brian's suggestion to clear stringbuilder by calling delete
with stringbuffer length. No luck. I am still getting the same error
message. Do you have any suggestions ?

Thanks

On Wed, Sep 2, 2020 at 3:33 PM Talat Uyarer 
wrote:

> If I'm understanding Talat's logic correctly, it's not necessary to reuse
>> the string builder at all in this case.
>
> Yes. I tried it too. But DF job has the same issue.
>
>
> On Wed, Sep 2, 2020 at 3:17 PM Kyle Weaver  wrote:
>
>> > It looks like `writer.setLength(0)` may actually allocate a new buffer,
>> and then the buffer may also need to be resized as the String grows, so you
>> could be creating a lot of orphaned buffers very quickly. I'm not that
>> familiar with StringBuilder, is there a way to reset it and re-use the
>> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>
>> If I'm understanding Talat's logic correctly, it's not necessary to reuse
>> the string builder at all in this case.
>>
>> On Wed, Sep 2, 2020 at 3:11 PM Brian Hulette  wrote:
>>
>>> That error isn't exactly an OOM, it indicates the JVM is spending a
>>> significant amount of time in garbage collection.
>>>
>>> It looks like `writer.setLength(0)` may actually allocate a new buffer,
>>> and then the buffer may also need to be resized as the String grows, so you
>>> could be creating a lot of orphaned buffers very quickly. I'm not that
>>> familiar with StringBuilder, is there a way to reset it and re-use the
>>> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>
>>> [1]
>>> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=8xskmxTZ2EbxwBknWfeIiV2kEsXsu9dzjWT_yG6A0s4=ZL6S353ZUzPRmxrPo8Sei_mdxsWDxs4Km2RwwiwefEU=>
>>>
>>> On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Sorry for the wrong import. You can see on the code I am using
>>>> StringBuilder.
>>>>
>>>> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>>>>
>>>>> Here is a question answered on StackOverflow:
>>>>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ=>
>>>>>
>>>>> Could you try using StringBuilder instead since the usage is not
>>>>> appropriate for a StringWriter?
>>>>>
>>>>>
>>>>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have an issue with String Concatenating. You can see my code
>>>>>> below.[1] I have a step on my df job which is concatenating strings. But
>>>>>> somehow when I use that step my job starts getting jvm restart errors.
>>>>>>
>>>>>>  Shutting down JVM after 8 consecutive periods of measured GC
>>>>>>> thrashing. Memory is used/total/max = 4112/5994/5994 MB, GC last/max =
>>>>>>> 97.36/97.36 %, #pushbacks=3, gc thrashing=true. Heap dump not written.
>>>>>>
>>>>>>
>>>>>> And also I try to use Avro rather than String. When I use Avro, it
>>>>>> works fine without any issue. Do you have any suggestions?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> [1] https://dpaste.com/7RTV86WQC
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY=>
>>>>>>
>>>>>>
>>>>>>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Talat Uyarer
>
> If I'm understanding Talat's logic correctly, it's not necessary to reuse
> the string builder at all in this case.

Yes. I tried it too. But DF job has the same issue.


On Wed, Sep 2, 2020 at 3:17 PM Kyle Weaver  wrote:

> > It looks like `writer.setLength(0)` may actually allocate a new buffer,
> and then the buffer may also need to be resized as the String grows, so you
> could be creating a lot of orphaned buffers very quickly. I'm not that
> familiar with StringBuilder, is there a way to reset it and re-use the
> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>
> If I'm understanding Talat's logic correctly, it's not necessary to reuse
> the string builder at all in this case.
>
> On Wed, Sep 2, 2020 at 3:11 PM Brian Hulette  wrote:
>
>> That error isn't exactly an OOM, it indicates the JVM is spending a
>> significant amount of time in garbage collection.
>>
>> It looks like `writer.setLength(0)` may actually allocate a new buffer,
>> and then the buffer may also need to be resized as the String grows, so you
>> could be creating a lot of orphaned buffers very quickly. I'm not that
>> familiar with StringBuilder, is there a way to reset it and re-use the
>> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>
>> [1]
>> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=8xskmxTZ2EbxwBknWfeIiV2kEsXsu9dzjWT_yG6A0s4=ZL6S353ZUzPRmxrPo8Sei_mdxsWDxs4Km2RwwiwefEU=>
>>
>> On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer 
>> wrote:
>>
>>> Sorry for the wrong import. You can see on the code I am using
>>> StringBuilder.
>>>
>>> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>>>
>>>> Here is a question answered on StackOverflow:
>>>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ=>
>>>>
>>>> Could you try using StringBuilder instead since the usage is not
>>>> appropriate for a StringWriter?
>>>>
>>>>
>>>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have an issue with String Concatenating. You can see my code
>>>>> below.[1] I have a step on my df job which is concatenating strings. But
>>>>> somehow when I use that step my job starts getting jvm restart errors.
>>>>>
>>>>>  Shutting down JVM after 8 consecutive periods of measured GC
>>>>>> thrashing. Memory is used/total/max = 4112/5994/5994 MB, GC last/max =
>>>>>> 97.36/97.36 %, #pushbacks=3, gc thrashing=true. Heap dump not written.
>>>>>
>>>>>
>>>>> And also I try to use Avro rather than String. When I use Avro, it
>>>>> works fine without any issue. Do you have any suggestions?
>>>>>
>>>>> Thanks
>>>>>
>>>>> [1] https://dpaste.com/7RTV86WQC
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY=>
>>>>>
>>>>>
>>>>>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Talat Uyarer
>
> You can try scoping the string builder instance to processElement, instead
> of making it a member of your DoFn.
>
I tried to create a StringBuilder in beamRow2CsvLine function too.  But it
has a similar issue. I put StringBuilder on Setup to reuse the same object
per bundle to reduce object recreation. AFAIK setup is only called by a
single thread. Based on my tests, reusing StringBuilder
increases the performance +25%.


My logic is actually simple: I need to convert Beam Row to csv string row.
I can try Brain's suggestion.



On Wed, Sep 2, 2020 at 3:11 PM Brian Hulette  wrote:

> That error isn't exactly an OOM, it indicates the JVM is spending a
> significant amount of time in garbage collection.
>
> It looks like `writer.setLength(0)` may actually allocate a new buffer,
> and then the buffer may also need to be resized as the String grows, so you
> could be creating a lot of orphaned buffers very quickly. I'm not that
> familiar with StringBuilder, is there a way to reset it and re-use the
> existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>
> [1]
> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=EFsNtBMh3aQSH1MXWx0-YmpRIgUHj6EfHvulHdoBkdw=H5Fupf1d3R199PF73T8D8YYAnipbS3YdJKj_4Ep2-DU=>
>
> On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer 
> wrote:
>
>> Sorry for the wrong import. You can see on the code I am using
>> StringBuilder.
>>
>> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>>
>>> Here is a question answered on StackOverflow:
>>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ=>
>>>
>>> Could you try using StringBuilder instead since the usage is not
>>> appropriate for a StringWriter?
>>>
>>>
>>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an issue with String Concatenating. You can see my code
>>>> below.[1] I have a step on my df job which is concatenating strings. But
>>>> somehow when I use that step my job starts getting jvm restart errors.
>>>>
>>>>  Shutting down JVM after 8 consecutive periods of measured GC
>>>>> thrashing. Memory is used/total/max = 4112/5994/5994 MB, GC last/max =
>>>>> 97.36/97.36 %, #pushbacks=3, gc thrashing=true. Heap dump not written.
>>>>
>>>>
>>>> And also I try to use Avro rather than String. When I use Avro, it
>>>> works fine without any issue. Do you have any suggestions?
>>>>
>>>> Thanks
>>>>
>>>> [1] https://dpaste.com/7RTV86WQC
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY=>
>>>>
>>>>
>>>>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Talat Uyarer
Sorry for the wrong import. You can see on the code I am using
StringBuilder.

On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:

> Here is a question answered on StackOverflow:
> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ=>
>
> Could you try using StringBuilder instead since the usage is not
> appropriate for a StringWriter?
>
>
> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I have an issue with String Concatenating. You can see my code below.[1]
>> I have a step on my df job which is concatenating strings. But somehow when
>> I use that step my job starts getting jvm restart errors.
>>
>>  Shutting down JVM after 8 consecutive periods of measured GC thrashing.
>>> Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
>>> #pushbacks=3, gc thrashing=true. Heap dump not written.
>>
>>
>> And also I try to use Avro rather than String. When I use Avro, it works
>> fine without any issue. Do you have any suggestions?
>>
>> Thanks
>>
>> [1] https://dpaste.com/7RTV86WQC
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY=>
>>
>>
>>


OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Talat Uyarer
Hi,

I have an issue with String Concatenating. You can see my code below.[1] I
have a step on my df job which is concatenating strings. But somehow when I
use that step my job starts getting jvm restart errors.

 Shutting down JVM after 8 consecutive periods of measured GC thrashing.
> Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
> #pushbacks=3, gc thrashing=true. Heap dump not written.


And also I try to use Avro rather than String. When I use Avro, it works
fine without any issue. Do you have any suggestions?

Thanks

[1] https://dpaste.com/7RTV86WQC


Re: Resource Consumption increase With TupleTag

2020-08-20 Thread Talat Uyarer
Hi Lucas,

> Not really. It is more about pipeline complexity, logging, debugging,
> monitoring which become more complex.

Should I use a different consumer group or should I use the same consumer
group ?  And also How Autoscaling will decide worker count ?

What do you mean by it's not working properly?

Actually i should correct my statement. Both jobs are using tuple tags but
when I add one more branch after MessageExtractor things are changing.

What does the timing information for the transforms tell you on the
> Dataflow Job UI?

Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its
Walltime shows 28 days. I put all wall time for each step.


|--->Filter1 (1 day) --> WriteGCS(1day)
KafkaIO(28 days)->MessageExtractor(7 hrs) -> |

|--->Filter2 (13 days) --> WriteGCS(2days)

Thanks

On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik  wrote:

> Do you mean I can put my simple pipeline multiple times for all topics in
> one dataflow job ?
> Yes
>
> Is there any side effect having multiple independent DAG on one DF job ?
> Not really. It is more about pipeline complexity, logging, debugging,
> monitoring which become more complex.
>
> And also why the TupleTag model is not working properly?
> What do you mean by it's not working properly?
>
> Why is it using more resources than what it should be?
> What does the timing information for the transforms tell you on the
> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
> have to now write to two GCS locations instead of one for each work item
> that you process so your doing more network calls)
>
>
> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer 
> wrote:
>
>> Filter step is an independent step. We can think it is an etl step or
>> something else. MessageExtractor step writes messages on TupleTags based on
>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
>> already. MessageExtractor is processing 48kps but branches are processing
>> their logs. Each Filter only consumes its log type. There is no any  So
>> That's why I assume it should consume the same amount of workers. But it
>> consumes more workers.
>>
>>
>>
>>  |--->Filter1(20kps)-->WriteGCS
>> KafkaIO->MessageExtractor(48kps)-> |
>>
>>  |--->Filter2(28kps)-->WriteGCS
>>
>> Do you mean I can put my simple pipeline multiple times for all topics in
>> one dataflow job ? Is there any side effect having multiple independent DAG
>> on one DF job ? And also why the TupleTag model is not working properly?
>> Why is it using more resources than what it should be?
>>
>> Thanks
>>
>>
>>
>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw 
>> wrote:
>>
>>> Just to clarify, previously you had.
>>>
>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>>
>>> And now you have
>>>
>>>
>>>   ---48kps--> Filter1
>>> -> WriteGCS
>>>   /
>>> KafkaIO(topic1, topic2) + MessageExtractor
>>>\
>>>  ---48kps--> Filter2 ->
>>> WriteGCS
>>>
>>> Each filter is now actually consuming (and throwing away) more data than
>>> before.
>>>
>>> Or is MessageExtractor literally a multi-output DoFn already (which is
>>> why you're talking about TupleTags). This could possibly be more
>>> expensive if reading Kafak with headers is more expensive than reading
>>> it without.
>>>
>>> If topic1 and topic2 are truly independent, I would keep their reads
>>> separate. This will simplify your pipeline (and sounds like it'll
>>> improve performance). Note that you don't have to have a separate
>>> Dataflow job for each read, you can have a single Pipeline and do as
>>> many reads as you want and the'll all get executed in the same job.
>>>
>>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>>>  wrote:
>>> >
>>> > Hi Robert,
>>> >
>>> > I calculated process speed based on worker count. When I have separate
>>> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
>>> KafkaIO message count. they had 4kps processing speed per worker. After I
>>> combine them in one df job. That DF job started using ~18 workers, not 12
>>> workers.
>>> >
>>> > How can I understand i

Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Filter step is an independent step. We can think it is an etl step or
something else. MessageExtractor step writes messages on TupleTags based on
the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
already. MessageExtractor is processing 48kps but branches are processing
their logs. Each Filter only consumes its log type. There is no any  So
That's why I assume it should consume the same amount of workers. But it
consumes more workers.



 |--->Filter1(20kps)-->WriteGCS
KafkaIO->MessageExtractor(48kps)-> |

 |--->Filter2(28kps)-->WriteGCS

Do you mean I can put my simple pipeline multiple times for all topics in
one dataflow job ? Is there any side effect having multiple independent DAG
on one DF job ? And also why the TupleTag model is not working properly?
Why is it using more resources than what it should be?

Thanks



On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw  wrote:

> Just to clarify, previously you had.
>
> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>
> And now you have
>
>
>   ---48kps--> Filter1
> -> WriteGCS
>   /
> KafkaIO(topic1, topic2) + MessageExtractor
>\
>  ---48kps--> Filter2 ->
> WriteGCS
>
> Each filter is now actually consuming (and throwing away) more data than
> before.
>
> Or is MessageExtractor literally a multi-output DoFn already (which is
> why you're talking about TupleTags). This could possibly be more
> expensive if reading Kafak with headers is more expensive than reading
> it without.
>
> If topic1 and topic2 are truly independent, I would keep their reads
> separate. This will simplify your pipeline (and sounds like it'll
> improve performance). Note that you don't have to have a separate
> Dataflow job for each read, you can have a single Pipeline and do as
> many reads as you want and the'll all get executed in the same job.
>
> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>  wrote:
> >
> > Hi Robert,
> >
> > I calculated process speed based on worker count. When I have separate
> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
> KafkaIO message count. they had 4kps processing speed per worker. After I
> combine them in one df job. That DF job started using ~18 workers, not 12
> workers.
> >
> > How can I understand if they are poorly fused or not ? I can not write
> Filter because it is a beamsql. I just want to simplified my DAG that's why
> i did not mentioned
> >
> > Thanks
> >
> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw 
> wrote:
> >>
> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> >> would be 4kps total), or only 2kps coming out of KafkaIO and
> >> MessageExtractor?
> >>
> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> >> things are getting fused poorly and you could write Filter1 and
> >> Filter2 instead as a DoFn with multiple outputs (see
> >>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs=DwIFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8=
> ).
> >>
> >> - Robert
> >>
> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >> >
> >> >
> |--->Filter1-->WriteGCS
> >> > KafkaIO->MessageExtractor-> |
> >> >
> |--->Filter2-->WriteGCS
> >> >
> >> > Do you have any idea why my data process speed decreased ?
> >> >
> >> > Thanks
>


Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Hi Robert,

I calculated process speed based on worker count. When I have
separate jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based
on KafkaIO message count. they had 4kps processing speed per worker. After
I combine them in one df job. That DF job started using ~18 workers, not 12
workers.

How can I understand if they are poorly fused or not ? I can not write
Filter because it is a beamsql. I just want to simplified my DAG that's why
i did not mentioned

Thanks

On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw  wrote:

> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
> would be 4kps total), or only 2kps coming out of KafkaIO and
> MessageExtractor?
>
> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
> things are getting fused poorly and you could write Filter1 and
> Filter2 instead as a DoFn with multiple outputs (see
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs=DwIFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8=
> ).
>
> - Robert
>
> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>  wrote:
> >
> > Hi,
> >
> > I have a very simple DAG on my dataflow job.
> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
> has 4kps per instance processing speed. However I want to consume two
> different topics in one DF job. I used TupleTag. I created TupleTags per
> message type. Each topic has different message types and also needs
> different filters. So my pipeline turned to below DAG. Message Extractor is
> a very simple step checking header of kafka messages and writing the
> correct TupleTag. However after starting to use this new DAG, dataflow
> canprocess 2kps per instance.
> >
> >  |--->Filter1-->WriteGCS
> > KafkaIO->MessageExtractor-> |
> >  |--->Filter2-->WriteGCS
> >
> > Do you have any idea why my data process speed decreased ?
> >
> > Thanks
>


Resource Consumption increase With TupleTag

2020-08-19 Thread Talat Uyarer
Hi,

I have a very simple DAG on my dataflow job. (KafkaIO->Filter->WriteGCS).
When I submit this Dataflow job per topic it has 4kps per instance
processing speed. However I want to consume two different topics in one DF
job. I used TupleTag. I created TupleTags per message type. Each topic has
different message types and also needs different filters. So my pipeline
turned to below DAG. Message Extractor is a very simple step checking
header of kafka messages and writing the correct TupleTag. However after
starting to use this new DAG, dataflow canprocess 2kps per instance.

 |--->Filter1-->WriteGCS
KafkaIO->MessageExtractor-> |
 |--->Filter2-->WriteGCS

Do you have any idea why my data process speed decreased ?

Thanks


Re: KafkaIO does not support add or remove topics

2020-06-29 Thread Talat Uyarer
Thanks Alexey for sharing tickets and code. I found one workaround to use
the update function. If I generate different KafkaIO step name for each
submission and
provide 
--transformNameMapping="{"Kafka_IO_3242/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds":""}"
My update command is successfully. I have a question about it. Is there any
way to prevent data loss ? I believe when I provide
that transformNameMapping, DF destroys previous KafakIO with its state.
Does .commitOffsetsInFinalize()  help me to prevent data loss ? I am ok
with small data duplication.

Thanks

On Mon, Jun 29, 2020 at 10:17 AM Alexey Romanenko 
wrote:

> Yes, it’s a known limitation [1] mostly due to the fact that KafkaIO.Read
> is based on UnboundedSource API and it fetches all information about topic
> and partitions only once during a “split" phase [2]. There is on-going work
> to make KafkaIO.Read based on Splittable DoFn [3] which should allow to get
> topic/partitions information dynamically, without restarting a pipeline.
>
> [1] https://issues.apache.org/jira/browse/BEAM-727
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_BEAM-2D727=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=RUPU-Ql-as-aMhyluV9No2li49C63Jzv4wrk0BTBzyM=qE1jp6UlbdKM5dXaDZzNnB9ZDEv5oy8e2v__lg7dhsc=>
> [2]
> https://github.com/apache/beam/blob/8a54c17235f768f089b36265d79e69ee9b27a2ce/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_8a54c17235f768f089b36265d79e69ee9b27a2ce_sdks_java_io_kafka_src_main_java_org_apache_beam_sdk_io_kafka_KafkaUnboundedSource.java-23L54=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=RUPU-Ql-as-aMhyluV9No2li49C63Jzv4wrk0BTBzyM=9HxiWfGyKoMej4vaaIlmJqacD90k8FcrPJNRn4tog4I=>
> [3] https://issues.apache.org/jira/browse/BEAM-9977
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_BEAM-2D9977=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=RUPU-Ql-as-aMhyluV9No2li49C63Jzv4wrk0BTBzyM=oOSICgp142hODu039pk4iIbe8Ziv0PkGcTaRh7kaDVU=>
>
>
> On 29 Jun 2020, at 18:14, Talat Uyarer 
> wrote:
>
> Hi,
>
> I am using Dataflow. When I update my DF job with a new topic or update
> partition count on the same topic. I can not use DF's update function.
> Could you help me to understand why I can not use the update function for
> that ?
>
> I checked the Beam source code but I could not find the right place to
> read on the code.
>
> Thanks for your help in advance
> Talat
>
>
>


KafkaIO does not support add or remove topics

2020-06-29 Thread Talat Uyarer
Hi,

I am using Dataflow. When I update my DF job with a new topic or update
partition count on the same topic. I can not use DF's update function.
Could you help me to understand why I can not use the update function for
that ?

I checked the Beam source code but I could not find the right place to read
on the code.

Thanks for your help in advance
Talat


Re: Building Dataflow Worker

2020-06-15 Thread Talat Uyarer
Thank you Luke and Steve for your help.

Looks like there is an issue with Gradle Errorprone's plugin. I had to
modify BeamModulePlugin.groovy [1], Without modification when i tried to
build it I got this error[2]. Should I create a patch for it ?

Last question. We are using streaming engine. Which module should we build
to replace the dataflow worker jar file ?

Thanks

[1] http://dpaste.com/238QH9Z
[2] http://dpaste.com/01J55J0

On Mon, Jun 15, 2020 at 1:57 PM Luke Cwik  wrote:

> I noticed that you are not using the gradle wrapper but your own installed
> version. Apache Beam 2.19 is using gradle 5.2.1, is the installed version
> compatible with that?
>
> Try
> ./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
> in a clean workspace.
>
> On Fri, Jun 12, 2020 at 4:30 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I want to build the dataflow worker on apache beram 2.19. However I faced
>> a grpc issue. I did not change anything. Just checked release-2.19.0 branch
>> and run build command. Could you help me understand why it does not build.
>> [1]
>>
>> Additional information, Based on my limited knowledge Looks like it is
>> looking for a class which is coming grpc 1.26 version. But beam 2.19
>> version is using grpc 1.21
>>
>> You can find build output below.
>>
>> Thanks
>> [1] http://dpaste.com/15X7429
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__dpaste.com_15X7429=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=zDPYSCF59Q6opRD-3y98savwq8rZ5jCKso2wW2SE9l4=yuJLSEbioHDZlLC2x-Lazdg-4Q5CZPurmIRX5WqvYHg=>
>>
>


Building Dataflow Worker

2020-06-12 Thread Talat Uyarer
Hi,

I want to build the dataflow worker on apache beram 2.19. However I faced a
grpc issue. I did not change anything. Just checked release-2.19.0 branch
and run build command. Could you help me understand why it does not build.
[1]

Additional information, Based on my limited knowledge Looks like it is
looking for a class which is coming grpc 1.26 version. But beam 2.19
version is using grpc 1.21

You can find build output below.

Thanks
[1] http://dpaste.com/15X7429


Re: KafkaIO Read Latency

2020-06-10 Thread Talat Uyarer
Hi,

I check the time when StartBundle is called and do the same thing for
FinishBundle then take the difference between Start and Finish Bundle times
and report bundle latency. I put this metric on
a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont
know if this is related, My pipeline has a Windowing function and
GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400.
My Current traffic is 8kps. I changed the window duration 5 seconds and 20
seconds. But it does not affect much.

KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink

.apply(Window.>into(
FixedWindows.of(Duration.standardSeconds(windowDurationSeconds)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast((int) batchSize),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(windowDurationSeconds
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(GroupIntoBatches.ofSize(batchSize))


Thanks


On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko 
wrote:

> Hi Talat,
>
> Could you elaborate what do you mean by “*opening and closing bundle*”?
>
> Sometimes, starting a KafkaReader can take time since it will seek for a
> start offset for each assigned partition but it happens only once at
> pipeline start-up and mostly depends on network conditions.
>
> On 9 Jun 2020, at 23:05, Talat Uyarer 
> wrote:
>
> Hi,
> I added some metrics on a step right after KafkaIO. When I compare the
> read time difference between producer and KafkaIO it is 800ms for P99.
> However somehow that step's opening and closing bundle difference is 18
> seconds for p99. The step itself does not do any specific thing. Do you
> have any idea why bundle latency is very high ? Where should I check or
> tune on KafkaIO ?
>
> Additional information I read from one topic. That topic has 15
> partitions. Producer write in a round robin fashion.
>
> Thanks
>
>
>


KafkaIO Read Latency

2020-06-09 Thread Talat Uyarer
Hi,
I added some metrics on a step right after KafkaIO. When I compare the read
time difference between producer and KafkaIO it is 800ms for P99. However
somehow that step's opening and closing bundle difference is 18 seconds for
p99. The step itself does not do any specific thing. Do you have any idea
why bundle latency is very high ? Where should I check or tune on KafkaIO ?

Additional information I read from one topic. That topic has 15 partitions.
Producer write in a round robin fashion.

Thanks


Re: Pipeline Processing Time

2020-06-09 Thread Talat Uyarer
Thank you Luke and Reuven for helping me. Now I can see my pipeline
processing time for each record.

On Wed, Jun 3, 2020 at 9:25 AM Reuven Lax  wrote:

> Note: you need to tag the timestamp parameter to @ProcessElement with
> the @Timestamp annotation.
>
> On Mon, Jun 1, 2020 at 3:31 PM Luke Cwik  wrote:
>
>> You can configure KafkaIO to use some data from the record as the
>> elements timestamp. See the KafkaIO javadoc around the TimestampPolicy[1],
>> the default is current processing time.
>> You can access the timestamp of the element by adding
>> "org.joda.time.Instant timestamp" as a parameter to your @ProcessElement,
>> see this javadoc for additional details[2]. You could then compute now() -
>> timestamp to calculate processing time.
>>
>> 1:
>> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/kafka/TimestampPolicy.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.21.0_org_apache_beam_sdk_io_kafka_TimestampPolicy.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KuUWakZ-xaVGYfsw7YGz1WBOLIlpBHikvRxgZs9vWn0=1Sp349fe5C5l4ttxy9iNBlkzoO-9RX_qrvVllkk-PGg=>
>> 2:
>> https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.21.0_org_apache_beam_sdk_transforms_DoFn.ProcessElement.html=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=KuUWakZ-xaVGYfsw7YGz1WBOLIlpBHikvRxgZs9vWn0=nkJq_weo7lrd-JzTEw5PeCC-dkivOJ6AlRxLFXwnMMM=>
>>
>> On Mon, Jun 1, 2020 at 2:00 PM Talat Uyarer 
>> wrote:
>>
>>> Sorry for the late response. Where does the beam set that timestamp
>>> field on element ? Is it set whenever KafkaIO reads that element ?
>>>
>> And also I have a windowing function on my pipeline. Does the timestamp
>>> field change for any kind of operation ? On pipeline I have the
>>> following steps: KafkaIO -> Format Conversion Pardo -> SQL Filter ->
>>> Windowing Step -> Custom Sink. If timestamp set in KafkaIO, Can I see
>>> process time by now() - timestamp in Custom Sink ?
>>>
>>>
>> Thanks
>>>
>>> On Thu, May 28, 2020 at 2:07 PM Luke Cwik  wrote:
>>>
>>>> Dataflow provides msec counters for each transform that executes. You
>>>> should be able to get them from stackdriver and see them from the Dataflow
>>>> UI.
>>>>
>>>> You need to keep track of the timestamp of the element as it flows
>>>> through the system as part of data that goes alongside the element. You can
>>>> use the element's timestamp[1] if that makes sense (it might not if you
>>>> intend to use a timestamp that is from the kafka record itself and the
>>>> record's timestamp isn't the same as the ingestion timestamp). Unless you
>>>> are writing your own sink, the sink won't track the processing time at all
>>>> so you'll need to add a ParDo that goes right before it that writes the
>>>> timing information to wherever you want (a counter, your own metrics
>>>> database, logs, ...).
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_018e889829e300ab9f321da7e0010ff0011a73b1_sdks_java_core_src_main_java_org_apache_beam_sdk_transforms_DoFn.java-23L257=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=1202mTv7BP1KzcBJECS98dr7u5riw0NHdl8rT8I6Ego=cPdnrK4r-tVd0iAO6j7eAAbDPISOdazEYBrPoC9cQOo=>
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 1:12 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Yes I am trying to track how long it takes for a single element to be
>>>>> ingested into the pipeline until it is output somewhere.
>>>>>
>>>>> My pipeline is unbounded. I am using KafkaIO. I did not think about
>>>>> CPU time. if there is a way to track it too, it would be useful to improve
>>>>> my metrics.
>>>>>
>>>>> On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:
>>>>>
>>>>>> What do you mean by processing time?
>>>>>>
>>>>>> Are you trying to track how long it takes f

Re: Pipeline Processing Time

2020-06-01 Thread Talat Uyarer
Sorry for the late response. Where does the beam set that timestamp field
on element ? Is it set whenever KafkaIO reads that element ? And also I
have a windowing function on my pipeline. Does the timestamp field change
for any kind of operation ? On pipeline I have the following steps: KafkaIO
-> Format Conversion Pardo -> SQL Filter -> Windowing Step -> Custom Sink.
If timestamp set in KafkaIO, Can I see process time by now() - timestamp in
Custom Sink ?

Thanks

On Thu, May 28, 2020 at 2:07 PM Luke Cwik  wrote:

> Dataflow provides msec counters for each transform that executes. You
> should be able to get them from stackdriver and see them from the Dataflow
> UI.
>
> You need to keep track of the timestamp of the element as it flows through
> the system as part of data that goes alongside the element. You can use the
> element's timestamp[1] if that makes sense (it might not if you intend to
> use a timestamp that is from the kafka record itself and the record's
> timestamp isn't the same as the ingestion timestamp). Unless you are
> writing your own sink, the sink won't track the processing time at all so
> you'll need to add a ParDo that goes right before it that writes the timing
> information to wherever you want (a counter, your own metrics database,
> logs, ...).
>
> 1:
> https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_018e889829e300ab9f321da7e0010ff0011a73b1_sdks_java_core_src_main_java_org_apache_beam_sdk_transforms_DoFn.java-23L257=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=1202mTv7BP1KzcBJECS98dr7u5riw0NHdl8rT8I6Ego=cPdnrK4r-tVd0iAO6j7eAAbDPISOdazEYBrPoC9cQOo=>
>
>
> On Thu, May 28, 2020 at 1:12 PM Talat Uyarer 
> wrote:
>
>> Yes I am trying to track how long it takes for a single element to be
>> ingested into the pipeline until it is output somewhere.
>>
>> My pipeline is unbounded. I am using KafkaIO. I did not think about CPU
>> time. if there is a way to track it too, it would be useful to improve my
>> metrics.
>>
>> On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:
>>
>>> What do you mean by processing time?
>>>
>>> Are you trying to track how long it takes for a single element to be
>>> ingested into the pipeline until it is output somewhere?
>>> Do you have a bounded pipeline and want to know how long all the
>>> processing takes?
>>> Do you care about how much CPU time is being consumed in aggregate for
>>> all the processing that your pipeline is doing?
>>>
>>>
>>> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> I am using Dataflow Runner. The pipeline read from kafkaIO and send
>>>> Http. I could not find any metadata field on the element to set first read
>>>> time.
>>>>
>>>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> Which runner are you using?
>>>>>
>>>>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a pipeline which has 5 steps. What is the best way to measure
>>>>>> processing time for my pipeline?
>>>>>>
>>>>>> Thnaks
>>>>>>
>>>>>


Re: Pipeline Processing Time

2020-05-28 Thread Talat Uyarer
Yes I am trying to track how long it takes for a single element to be
ingested into the pipeline until it is output somewhere.

My pipeline is unbounded. I am using KafkaIO. I did not think about CPU
time. if there is a way to track it too, it would be useful to improve my
metrics.

On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:

> What do you mean by processing time?
>
> Are you trying to track how long it takes for a single element to be
> ingested into the pipeline until it is output somewhere?
> Do you have a bounded pipeline and want to know how long all the
> processing takes?
> Do you care about how much CPU time is being consumed in aggregate for all
> the processing that your pipeline is doing?
>
>
> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> I am using Dataflow Runner. The pipeline read from kafkaIO and send Http.
>> I could not find any metadata field on the element to set first read time.
>>
>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver  wrote:
>>
>>> Which runner are you using?
>>>
>>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a pipeline which has 5 steps. What is the best way to measure
>>>> processing time for my pipeline?
>>>>
>>>> Thnaks
>>>>
>>>


Re: Pipeline Processing Time

2020-05-28 Thread Talat Uyarer
I am using Dataflow Runner. The pipeline read from kafkaIO and send Http. I
could not find any metadata field on the element to set first read time.

On Thu, May 28, 2020 at 10:44 AM Kyle Weaver  wrote:

> Which runner are you using?
>
> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer 
> wrote:
>
>> Hi,
>>
>> I have a pipeline which has 5 steps. What is the best way to measure
>> processing time for my pipeline?
>>
>> Thnaks
>>
>


Pipeline Processing Time

2020-05-28 Thread Talat Uyarer
Hi,

I have a pipeline which has 5 steps. What is the best way to measure
processing time for my pipeline?

Thnaks


KafkaIO BackLog Elements Metrics

2020-05-09 Thread Talat Uyarer
Hi,

I want to get Kafka's backlog metrics. In apache beam code I saw beam is
collecting that metrics in here[1] as Source Metrics. However I can not see
those metrics on Dataflow's metrics explorer. Do you know is there
anyway to get those metrics ?

Also I saw there is MetricsSink. But based on beam documentation it is not
supported by Dataflow. Is there any ticket to give support MetrcisSink
support to Dataflow Runner ?

Thanks

[1]
https://github.com/apache/beam/blob/d309c1b7c39ba78aa0cbd5cad9cc7a256e3caa9f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L673


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-24 Thread Talat Uyarer
Hi Rui,

I solved the issue. After 1.21 version they are not getting flattened in
LogicalPlan.

Thanks for your help. I am going to create a patch for it.

Talat

On Sat, Feb 15, 2020 at 6:26 PM Rui Wang  wrote:

> Because Calcite flattened Row so BeamSQL didn't need to deal with nested
> Row structure (as they were flattened in LogicalPlan).
>
> Depends on how that patch works. Nested row might not immediately work
> after you apply that patch.
>
>
> -Rui
>
> On Fri, Feb 14, 2020 at 3:14 PM Talat Uyarer 
> wrote:
>
>> Do you mean they were flattened before by calcite or Does beam flatten
>> them too ?
>>
>>
>>
>> On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:
>>
>>> Nested row types might be less well supported (r.g. Row) because they
>>> were flattened before anyway.
>>>
>>>
>>> -Rui
>>>
>>> On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Thank you for your response.
>>>> I saw it and applied patch on calcite 1.20. However I realized
>>>> BeamCalRel does not generate right code [1]to turn back Beam types. I am
>>>> working on that now. Please let me know if apache beam support nested row
>>>> types but I miss it.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_646f596988be9d6a739090f48d2fed07c8dfc17c_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamCalcRel.java-23L167=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=jP7K7YSYNuRNxEoqtYiLG5J8to51xQOEDCaQXWPyPGY=xHSFnG8v0vnb1rLc9Idq3f-21woIO5o6PI196o58n_s=>
>>>>
>>>> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>>>>
>>>>> Calcite has improved to reconstruct ROW back in the output. See [1].
>>>>> Beam need to update Calcite dependency to > 1.21 to adopt that.
>>>>>
>>>>>
>>>>>
>>>>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y=>
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>>>>> records. I read them as Pcollection and apply Select * query and
>>>>>> compare with initial rows. Looks like nested rows are flatten by calcite.
>>>>>> How do you have any idea how can I avoid this?
>>>>>>
>>>>>> I added a same testcase for my issue:
>>>>>>
>>>>>> Schema nestedSchema =
>>>>>> Schema.builder()
>>>>>> .addInt32Field("f_nestedInt")
>>>>>> .addStringField("f_nestedString")
>>>>>> .addInt32Field("f_nestedIntPlusOne")
>>>>>> .build();
>>>>>> Schema inputType =
>>>>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>>>>> nestedSchema).build();
>>>>>>
>>>>>> PCollection input =
>>>>>> pipeline.apply(
>>>>>> Create.of(
>>>>>> Row.withSchema(inputType)
>>>>>> .addValues(
>>>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>>>> 313).build())
>>>>>> .build())
>>>>>> .withRowSchema(inputType))
>>>>>> .setRowSchema(inputType);
>>>>>>
>>>>>> PCollection result =
>>>>>> input
>>>>>> .apply(
>>>>>> SqlTransform.query(
>>>>>> "SELECT * FROM PCOLLECTION"));
>>>>>>
>>>>>> PAssert.that(result)
>>>>>> .containsInAnyOrder(Row.withSchema(inputType)
>>>>>> .addValues(
>>>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>>>> 313).build())
>>>>>> .build());
>>>>>>
>>>>>>
>>>>>> Thank you so much in advance.
>>>>>>
>>>>>>


Re: Beam SQL Nested Rows are flatten by Calcite

2020-02-14 Thread Talat Uyarer
Do you mean they were flattened before by calcite or Does beam flatten them
too ?



On Fri, Feb 14, 2020 at 1:21 PM Rui Wang  wrote:

> Nested row types might be less well supported (r.g. Row) because they were
> flattened before anyway.
>
>
> -Rui
>
> On Fri, Feb 14, 2020 at 12:14 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Thank you for your response.
>> I saw it and applied patch on calcite 1.20. However I realized BeamCalRel
>> does not generate right code [1]to turn back Beam types. I am working on
>> that now. Please let me know if apache beam support nested row types but I
>> miss it.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/646f596988be9d6a739090f48d2fed07c8dfc17c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L167
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_646f596988be9d6a739090f48d2fed07c8dfc17c_sdks_java_extensions_sql_src_main_java_org_apache_beam_sdk_extensions_sql_impl_rel_BeamCalcRel.java-23L167=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=jP7K7YSYNuRNxEoqtYiLG5J8to51xQOEDCaQXWPyPGY=xHSFnG8v0vnb1rLc9Idq3f-21woIO5o6PI196o58n_s=>
>>
>> On Fri, Feb 14, 2020 at 10:33 AM Rui Wang  wrote:
>>
>>> Calcite has improved to reconstruct ROW back in the output. See [1].
>>> Beam need to update Calcite dependency to > 1.21 to adopt that.
>>>
>>>
>>>
>>> [1]: https://jira.apache.org/jira/browse/CALCITE-3138
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__jira.apache.org_jira_browse_CALCITE-2D3138=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=kPxDNZSy_WpbC0xfVKTFpbSnpFAdhwMZYhSq9L-8H0g=jliQ_5N9_-n0EN1qXNmzeBX4m8Xhdcv_UtaHQ812L9Y=>
>>>
>>>
>>> -Rui
>>>
>>> On Thu, Feb 13, 2020 at 9:05 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to Beam SQL. But something is wrong. I have nested row
>>>> records. I read them as Pcollection and apply Select * query and
>>>> compare with initial rows. Looks like nested rows are flatten by calcite.
>>>> How do you have any idea how can I avoid this?
>>>>
>>>> I added a same testcase for my issue:
>>>>
>>>> Schema nestedSchema =
>>>> Schema.builder()
>>>> .addInt32Field("f_nestedInt")
>>>> .addStringField("f_nestedString")
>>>> .addInt32Field("f_nestedIntPlusOne")
>>>> .build();
>>>> Schema inputType =
>>>> Schema.builder().addInt32Field("f_int").addRowField("f_row", 
>>>> nestedSchema).build();
>>>>
>>>> PCollection input =
>>>> pipeline.apply(
>>>> Create.of(
>>>> Row.withSchema(inputType)
>>>> .addValues(
>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>> 313).build())
>>>> .build())
>>>> .withRowSchema(inputType))
>>>> .setRowSchema(inputType);
>>>>
>>>> PCollection result =
>>>> input
>>>> .apply(
>>>> SqlTransform.query(
>>>> "SELECT * FROM PCOLLECTION"));
>>>>
>>>> PAssert.that(result)
>>>> .containsInAnyOrder(Row.withSchema(inputType)
>>>> .addValues(
>>>> 1, Row.withSchema(nestedSchema).addValues(312, "CC", 
>>>> 313).build())
>>>> .build());
>>>>
>>>>
>>>> Thank you so much in advance.
>>>>
>>>>


Beam SQL Nested Rows are flatten by Calcite

2020-02-13 Thread Talat Uyarer
Hi,

I am trying to Beam SQL. But something is wrong. I have nested row records.
I read them as Pcollection and apply Select * query and compare with
initial rows. Looks like nested rows are flatten by calcite. How do you
have any idea how can I avoid this?

I added a same testcase for my issue:

Schema nestedSchema =
Schema.builder()
.addInt32Field("f_nestedInt")
.addStringField("f_nestedString")
.addInt32Field("f_nestedIntPlusOne")
.build();
Schema inputType =
Schema.builder().addInt32Field("f_int").addRowField("f_row",
nestedSchema).build();

PCollection input =
pipeline.apply(
Create.of(
Row.withSchema(inputType)
.addValues(
1, Row.withSchema(nestedSchema).addValues(312,
"CC", 313).build())
.build())
.withRowSchema(inputType))
.setRowSchema(inputType);

PCollection result =
input
.apply(
SqlTransform.query(
"SELECT * FROM PCOLLECTION"));

PAssert.that(result)
.containsInAnyOrder(Row.withSchema(inputType)
.addValues(
1, Row.withSchema(nestedSchema).addValues(312, "CC", 313).build())
.build());


Thank you so much in advance.


Limit log files count with Dataflow Runner Logging

2019-08-28 Thread Talat Uyarer
Hi All,

This is my first message for this maillist. Please let me know if I am
sending this message to wrong maillist.

My stream processing job are running on Google Cloud Dataflow engine. For
logging I am using Stackdriver. I added runtime slf4j-jdk14 and slf4j-api
to enable to stackdriver. However my pipeline create lots of logs and my
instances are getting out of space issue. I checked log rotating and limit
count of log files. I could not find any settings for them. How can I set
this settings.

Thanks