[java] Trouble with gradle and using ParquetIO

2023-04-20 Thread Evan Galpin
Hi all,

I'm trying to make use of ParquetIO.  Based on what's documented in maven
central, I'm including the artifact in "compileOnly" mode (or in maven
parlance, 'provided' scope).  I can successfully compile my pipeline, but
when I run it I (intuitively?) am met with a ClassNotFound exception for
ParquetIO.

Is 'compileOnly' still the desired way to include ParquetIO as a pipeline
dependency?

Thanks,
Evan


Re: Can I batch data when i use JDBC write operation?

2023-04-20 Thread Juan Romero
Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero () escribió:

> Hi community.
>
> On this occasion I have a doubt regarding how to read a stream from kafka
> and write batches of data with the jdbc connector. The idea is to override
> a specific row if the current row we want to insert into has the same id
> and the load_date_time is greater. The conceptual pipeline look like this
> and it is working (Take in mind that the source will be a streaming from
> kafka):
>
> ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, 
> load_date_time=str)
>
>
> with beam.Pipeline() as p:
>   _ = (
>   p
>   | beam.Create(
> [
>
> ExampleRow(1, '', '2023-04-05 12:34:56'),
> ExampleRow(1, 'yyyz', '2023-04-05 12:34:55')
> ]).with_output_types(ExampleRow)
>   | 'Write to jdbc' >> WriteToJdbc(
>   driver_class_name='org.postgresql.Driver',
>   jdbc_url='jdbc:postgresql://localhost:5432/postgres',
>   username='postgres',
>   password='postgres',
>   table_name= 'test',
>   connection_properties="stringtype=unspecified",
>   statement= 'INSERT INTO test \
> VALUES(?,?,?) \
>   ON CONFLICT (id)\
> DO UPDATE SET name = EXCLUDED.name, load_date_time = 
> EXCLUDED.load_date_time\
>   WHERE EXCLUDED.load_date_time::timestamp > 
> test.load_date_time::timestamp',
>   ))
>
> My question is if I want to write a stream that comes from kafka how can
> how can avoid the jdbc connector inserting the register one by one
> statement and rather insert the data in based time batches. Probably
> internally jdbc has some kind of "intelligence for do this" but i want to
> know what do you think about it  .
>
> Thank you!
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský

Hi Ning,

I might have missed that in the discussion, but we talk about batch 
execution, am I right? In streaming, all operators (PTransforms) of a 
Pipeline are run in the same slots, thus the downsides are limited. You 
can enforce streaming mode using --streaming command-line argument. But 
yes, this might have other implications. For batch only it obviously 
makes sense to limit parallelism of a (fused) 'stage', which is not an 
transform-level concept, but rather a more complex union of transforms 
divided by shuffle barrier. Would you be willing to start a follow-up 
thread in @dev mailing list for this for deeper discussion?


 Jan

On 4/20/23 19:18, Ning Kang via user wrote:

Hi Jan,

The approach works when your pipeline doesn't have too many operators. 
And the operator that needs the highest parallelism can only use at 
most #total_task_slots / #operators resources available in the cluster.


Another downside is wasted resources for other smaller operators who 
cannot make full use of task slots assigned to them. You might see 
only 1/10 tasks running while the other 9/10 tasks idle for an 
operator with parallelism 10, especially when it's doing some 
aggregation like a SUM.


One redeeming method is that, for operators following another operator 
with high fanout, we can explicitly add a Reshuffle to allow a higher 
parallelism. But this circles back to the first downside: if your 
pipeline has exponentially high fanout through it, setting a single 
parallelism for the whole pipeline is not ideal because it limits the 
scalability of your pipeline significantly.


Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:

Hi,

this topic was discussed many years ago and the conclusion there
was that setting the parallelism of individual operators via
FlinkPipelineOptions (or ResourceHints) is be possible, but would
be somewhat cumbersome. Although I understand that it "feels"
weird to have high parallelism for operators with small inputs,
does this actually bring any relevant performance impact? I always
use parallelism based on the largest operator in the Pipeline and
this seems to work just fine. Is there any particular need or
measurable impact of such approach?

 Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:

Same need here, using Flink runner. We are processing a
pcollection (extracting features per element) then combining
these into groups of features and running the next operator on
those groups.

Each group contains ~50 elements, so the parallelism of the
operator upstream of the groupby should be higher, to be balanced
with the downstream operator.

On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:

Hi Reuven,

It would be better to set parallelism for operators, as I
mentioned before, there may be multiple groupby, join
operators in one pipeline, and their parallelism can be
different due to different input data sizes.

On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax 
wrote:

Jeff - does setting the global default work for you, or
do you need per-operator control? Seems like it would be
to add this to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
 wrote:

Yeah, I don't think we have a good per-operator API
for this. If we were to add it, it probably belongs
in ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
 wrote:

Looking at FlinkPipelineOptions, there is a
parallelism option you can set. I believe this
sets the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
 wrote:

Thanks Holden, this would work for Spark, but
Flink doesn't have such kind of mechanism, so
I am looking for a general solution on the
beam side.

On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
 wrote:

To a (small) degree Sparks “new” AQE
might be able to help depending on what
kind of operations Beam is compiling it
down to.

Have you tried setting
spark.sql.adaptive.enabled &
spark.sql.adaptive.coalescePartitions.enabled



On Mon, Apr 17, 2023 at 10:34 AM Reuven
Lax via user  wrote:

I see. Robert - what is the story for
parallelism controls on GBK with the
Spark or Flink runners?

  

Re: Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Thank You for the reply and a hint.

1. Yes did try with Calcite `ROW` too - `java.lang.NoSuchFieldException: 
head (state=,code=0)` but on the transformation side `SELECT * FROM 
etl_raw LIMIT 1`. Maybe I need to directly refer to a field that I need 
instead of using `*`? Do You know from top of your head whats the syntax 
to get `repo_state.head.commit` value? Did try also to add more fields 
to the table def but no luck.


2.

> doesn't actually do anything on the SQL shell at query parse time

This is also my observation. What is the proper way to initialize Zeta? 
Did try `./bin/shell 
--plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner` 
and few other variations but seems like sqlline does not allow for 
passing parameters to pipeline options.


I do not have any requirement specific for Zeta but it seems it has a 
bit more support for JSON functions than Calcite.


3. Any other idea to workaround this issue?

Thanks for support!

For context I do try to setup some POC with this SQLs as it could really 
speed up development and maintenance of our pipelines + SQL has low 
requirements regarding knowledge to actually understand the logic (read 
the code). If this works it might be a great (killer) feature for Beam.


Best

Wisniowski Piotr



On 20.04.2023 20:52, Andrew Pilloud via user wrote:

set plannerName doesn't actually do anything on the SQL shell at query 
parse time, it will still use the calcite parser. Have you tried 
calcite SQL?


Support for struts is somewhat limited. I know there are bugs around 
nested structs and structs with single values.


Andrew

On Thu, Apr 20, 2023 at 9:26 AM Wiśniowski Piotr 
 wrote:


Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL
shell. I try to:

```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName =
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;

CREATE EXTERNAL TABLE etl_raw(
 event_timestamp TIMESTAMP,
 event_type VARCHAR,
 message_id VARCHAR,
 tracking_source VARCHAR,
 tracking_version VARCHAR,
 `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name`
VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but
still I do fail to receive data on

`SELECT * FROM etl_raw LIMIT 1;` with exception of
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure
that the field is there in json payload.

With commented out `repo_state` filed I am able to retrieve the data.
Unfortunately I do not have control over the payload structure as its
3rd party hook to make it flat.

In general I am unable to parse json msg from pubsub having
structured
field.

Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr




Re: Beam shell sql with zeta

2023-04-20 Thread Andrew Pilloud via user
set plannerName doesn't actually do anything on the SQL shell at query
parse time, it will still use the calcite parser. Have you tried calcite
SQL?

Support for struts is somewhat limited. I know there are bugs around nested
structs and structs with single values.

Andrew

On Thu, Apr 20, 2023 at 9:26 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> I have a question regarding usage of Zeta with SQL extensions in SQL
> shell. I try to:
>
> ```
>
> SET runner = DirectRunner;
> SET tempLocation = `/tmp/test/`;
> SET streaming=`True`;
> SET plannerName =
> `org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;
>
> CREATE EXTERNAL TABLE etl_raw(
>  event_timestamp TIMESTAMP,
>  event_type VARCHAR,
>  message_id VARCHAR,
>  tracking_source VARCHAR,
>  tracking_version VARCHAR,
>  `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
> )
> TYPE pubsub
> LOCATION 'projects/xxx/topics/xxx'
> TBLPROPERTIES '{"format":"json"}';
>
> ```
>
> But get error `parse failed: Encountered "STRUCT" `.
>
> If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but
> still I do fail to receive data on
>
> `SELECT * FROM etl_raw LIMIT 1;` with exception of
> `java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure
> that the field is there in json payload.
>
> With commented out `repo_state` filed I am able to retrieve the data.
> Unfortunately I do not have control over the payload structure as its
> 3rd party hook to make it flat.
>
> In general I am unable to parse json msg from pubsub having structured
> field.
>
> Is anyone familiar with this part of Beam functionalities?
>
> Best regards
>
> Wisniowski Piotr
>
>
>
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Ning Kang via user
Hi Jan,

The approach works when your pipeline doesn't have too many operators. And
the operator that needs the highest parallelism can only use at most
#total_task_slots / #operators resources available in the cluster.

Another downside is wasted resources for other smaller operators who cannot
make full use of task slots assigned to them. You might see only 1/10 tasks
running while the other 9/10 tasks idle for an operator with parallelism
10, especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another operator with
high fanout, we can explicitly add a Reshuffle to allow a higher
parallelism. But this circles back to the first downside: if your pipeline
has exponentially high fanout through it, setting a single parallelism for
the whole pipeline is not ideal because it limits the scalability of your
pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:

> Hi,
>
> this topic was discussed many years ago and the conclusion there was that
> setting the parallelism of individual operators via FlinkPipelineOptions
> (or ResourceHints) is be possible, but would be somewhat cumbersome.
> Although I understand that it "feels" weird to have high parallelism for
> operators with small inputs, does this actually bring any relevant
> performance impact? I always use parallelism based on the largest operator
> in the Pipeline and this seems to work just fine. Is there any particular
> need or measurable impact of such approach?
>
>  Jan
> On 4/19/23 17:23, Nimalan Mahendran wrote:
>
> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
>>> wrote:
>>>
 Yeah, I don't think we have a good per-operator API for this. If we
 were to add it, it probably belongs in ResourceHints.

 On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam 
>> side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>> on what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
 I see. Robert - what is the story for parallelism controls on
 GBK with the Spark or Flink runners?

 On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang 
 wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax 
> wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>> Spark and Flink - dynamically modifies the parallelism as the 
>> operator
>> runs, so there is no need to have such controls. In fact these 
>> specific
>> controls wouldn't make much sense for the way Dataflow implements 
>> these
>> operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang 
>> wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
 What are you trying to achieve by setting the parallelism?

 On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
 wrote:

> Thanks Reuven, what I mean is to set the parallelism in
> operator level. And the input size of the operator is unknown at 
> 

Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL 
shell. I try to:


```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName = 
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;


CREATE EXTERNAL TABLE etl_raw(
    event_timestamp TIMESTAMP,
    event_type VARCHAR,
    message_id VARCHAR,
    tracking_source VARCHAR,
    tracking_version VARCHAR,
    `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but 
still I do fail to receive data on


`SELECT * FROM etl_raw LIMIT 1;` with exception of 
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure 
that the field is there in json payload.


With commented out `repo_state` filed I am able to retrieve the data. 
Unfortunately I do not have control over the payload structure as its 
3rd party hook to make it flat.


In general I am unable to parse json msg from pubsub having structured 
field.


Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr





Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-20 Thread Evan Galpin
For more info on splitable DoFn, there is a good resource on the beam
blog[1].  Alexey has also shown a great alternative!

[1] https://beam.apache.org/blog/splittable-do-fn/

On Thu, Apr 20, 2023 at 9:08 AM Alexey Romanenko 
wrote:

> Some Java IO-connectors implement a class something like "class ReadAll
> extends PTransform, PCollection>” where
> “Read” is supposed to be configured dynamically. As a simple example, take
> a look on “SolrIO” [1]
>
> So, to support what you are looking for, “ReadAll”-pattern should be
> implemented for ElasticsearchIO.
>
> —
> Alexey
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
>
> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user 
> wrote:
>
> I'm running into an issue using the ElasticsearchIO.read() to handle more
> than one instance of a query. My queries are being dynamically built as a
> PCollection based on an incoming group of values. I'm trying to see how
> to load the .withQuery() parameter which could provide this capability or
> any approach that provides flexibility.
>
> The issue is that ElasticsearchIO.read() method expects a PBegin input to
> start a pipeline, but it seems like I need access outside of a pipeline
> context somehow. PBegin represents the beginning of a pipeline, and it's
> required to create a pipeline that can read data from Elasticsearch using
> IOElasticsearchIO.read().
>
> Can I wrap the ElasticsearchIO.read() call in a Create transform that
> creates a PCollection with a single element (e.g., PBegin) to simulate the
> beginning of a pipeline or something similar?
>
> Here is my naive attempt without accepting the reality of PBegin:
>PCollection queries = ... // a PCollection of Elasticsearch
> queries
>
> PCollection queryResults = queries.apply(
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String query = c.element();
> PCollection results = c.pipeline()
> .apply(ElasticsearchIO.read()
> .withConnectionConfiguration(
>
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
> .withQuery(query));
> c.output(results);
> }
> })
> .apply(Flatten.pCollections()));
>
>
>
> In general I'm wondering for any of IO-related classes proved by Beam that
> conforms to PBegin input -- if there is a means to introduce a collection.
>
>
>
> Here is one approach that might be promising:
> // Define a ValueProvider for a List
> ValueProvider> myListProvider =
> ValueProvider.StaticValueProvider.of(myList);
>
> // Use the ValueProvider to create a PCollection of Strings
> PCollection pcoll =
> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>
> PCollection partitionData = PBegin.in(pipeline)
> .apply("Read data from Elasticsearch", 
> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider
> pcoll).withScrollKeepalive("1m").withBatchSize(50))
> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
> opt.getMedTagVersion(), opt.getNoteType()));
>
> Any thoughts or ideas would be great.   Thanks, ~Sean
>
>
>


Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-20 Thread Alexey Romanenko
Some Java IO-connectors implement a class something like "class ReadAll extends 
PTransform, PCollection>” where “Read” is 
supposed to be configured dynamically. As a simple example, take a look on 
“SolrIO” [1] 

So, to support what you are looking for, “ReadAll”-pattern should be 
implemented for ElasticsearchIO.

—
Alexey

[1] 
https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java

> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user  
> wrote:
> 
> I'm running into an issue using the ElasticsearchIO.read() to handle more 
> than one instance of a query. My queries are being dynamically built as a 
> PCollection based on an incoming group of values. I'm trying to see how to 
> load the .withQuery() parameter which could provide this capability or any 
> approach that provides flexibility.
>  
> The issue is that ElasticsearchIO.read() method expects a PBegin input to 
> start a pipeline, but it seems like I need access outside of a pipeline 
> context somehow. PBegin represents the beginning of a pipeline, and it's 
> required to create a pipeline that can read data from Elasticsearch using 
> IOElasticsearchIO.read().
>  
> Can I wrap the ElasticsearchIO.read() call in a Create transform that creates 
> a PCollection with a single element (e.g., PBegin) to simulate the beginning 
> of a pipeline or something similar?
>  
> Here is my naive attempt without accepting the reality of PBegin:
>PCollection queries = ... // a PCollection of Elasticsearch queries
>
> PCollection queryResults = queries.apply(
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String query = c.element();
> PCollection results = c.pipeline()
> .apply(ElasticsearchIO.read()
> .withConnectionConfiguration(
> 
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
> .withQuery(query));
> c.output(results);
> }
> })
> .apply(Flatten.pCollections()));
>  
>  
> In general I'm wondering for any of IO-related classes proved by Beam that 
> conforms to PBegin input -- if there is a means to introduce a collection.
> 
>  
> 
> Here is one approach that might be promising:
> 
> // Define a ValueProvider for a List
> ValueProvider> myListProvider = 
> ValueProvider.StaticValueProvider.of(myList);
>  
> // Use the ValueProvider to create a PCollection of Strings
> PCollection pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
> ListCoder.of()));
>  
> PCollection partitionData = PBegin.in(pipeline)
> .apply("Read data from Elasticsearch", 
> ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider
>  pcoll).withScrollKeepalive("1m").withBatchSize(50))
> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), 
> opt.getMedTagVersion(), opt.getNoteType()));
>  
> Any thoughts or ideas would be great.   Thanks, ~Sean



Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL 
shell. I try to:


```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName = 
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;


CREATE EXTERNAL TABLE etl_raw(
    event_timestamp TIMESTAMP,
    event_type VARCHAR,
    message_id VARCHAR,
    tracking_source VARCHAR,
    tracking_version VARCHAR,
    `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but 
still I do fail to receive data on


`SELECT * FROM etl_raw LIMIT 1;` with exception of 
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure 
that the field is there in json payload.


With commented out `repo_state` filed I am able to retrieve the data. 
Unfortunately I do not have control over the payload structure as its 
3rd party hook to make it flat.


In general I am unable to parse json msg from pubsub having structured 
field.


Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr





Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-20 Thread Murphy, Sean P. via user
I’m not able to find any implementation of ‘SplitableDoFn”.  All reference I 
can find are of “Splitable DoFn”, so could you point me in the right version of 
the Apache Beam SDK that would have this?   Thanks, ~Sean

From: Evan Galpin 
Date: Wednesday, April 19, 2023 at 4:46 PM
To: user@beam.apache.org 
Cc: Murphy, Sean P. 
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), 
which expects a PBegin input and a means to handle a collection of queries
Yes unfortunately the ES IO connector is not built in a way that can work by 
taking inputs from a PCollection to issue Reads. The most scalable way to 
support this is to revisit the implementation of Elasticsearch Read transform 
and instead implement it as a SplittableDoFn.

On Wed, Apr 19, 2023 at 10:51 Shahar Frank 
mailto:srf...@gmail.com>> wrote:
Hi Sean,

I'm not an expert but I think the .withQuery() functions takes part of the 
build stage rather than the runtime stage.
This means that the way ElasticsearchIO was built is so that while the pipeline 
is being built you could set the query but it is not possible during runtime 
which mean you cannot dynamically run the query based on the element processed 
within the pipeline.

To do something like that the transformation must be designed more like the 
FileIO in this example: (From 
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html)

 PCollection> filesAndContents = p

 .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))

 // withCompression can be omitted - by default compression is detected 
from the filename.

 .apply(FileIO.readMatches().withCompression(GZIP))

 .apply(MapElements

 // uses imports from TypeDescriptors

 .into(kvs(strings(), strings()))

 .via((ReadableFile f) -> {

   try {

 return KV.of(

 f.getMetadata().resourceId().toString(), 
f.readFullyAsUTF8String());

   } catch (IOException ex) {

 throw new RuntimeException("Failed to read the file", ex);

   }

 }));

If you look at how FileIO.readMatches() works - it doesn't set the filename 
when building the pipeline but rather accepts that within the ProcessElement 
function.

See 
here

Does that make sense?

Cheers,
Shahar.


Shahar Frank

srf...@gmail.com

+447799561438







On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user 
mailto:user@beam.apache.org>> wrote:
I'm running into an issue using the ElasticsearchIO.read() to handle more than 
one instance of a query. My queries are being dynamically built as a 
PCollection based on an incoming group of values. I'm trying to see how to load 
the .withQuery() parameter which could provide this capability or any approach 
that provides flexibility.

The issue is that ElasticsearchIO.read() method expects a PBegin input to start 
a pipeline, but it seems like I need access outside of a pipeline context 
somehow. PBegin represents the beginning of a pipeline, and it's required to 
create a pipeline that can read data from Elasticsearch using 
IOElasticsearchIO.read().

Can I wrap the ElasticsearchIO.read() call in a Create transform that creates a 
PCollection with a single element (e.g., PBegin) to simulate the beginning of a 
pipeline or something similar?

Here is my naive attempt without accepting the reality of PBegin:
   PCollection queries = ... // a PCollection of Elasticsearch queries

PCollection queryResults = queries.apply(
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
String query = c.element();
PCollection results = c.pipeline()
.apply(ElasticsearchIO.read()
.withConnectionConfiguration(

ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
.withQuery(query));
c.output(results);
}
})
.apply(Flatten.pCollections()));


In general I'm wondering for any of IO-related classes proved by Beam that 
conforms to PBegin input -- if there is a means to introduce a collection.

Here is one approach that might be promising:
// Define a ValueProvider for a List
ValueProvider> myListProvider = 
ValueProvider.StaticValueProvider.of(myList);

// Use the ValueProvider to create a PCollection of Strings
PCollection pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
ListCoder.of()));

PCollection partitionData = PBegin.in(pipeline)
.apply("Read data from Elasticsearch", 
ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider
 pcoll).withScrollKeepalive("1m").withBatchSize(50))
  

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský

Hi,

this topic was discussed many years ago and the conclusion there was 
that setting the parallelism of individual operators via 
FlinkPipelineOptions (or ResourceHints) is be possible, but would be 
somewhat cumbersome. Although I understand that it "feels" weird to have 
high parallelism for operators with small inputs, does this actually 
bring any relevant performance impact? I always use parallelism based on 
the largest operator in the Pipeline and this seems to work just fine. 
Is there any particular need or measurable impact of such approach?


 Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:
Same need here, using Flink runner. We are processing a pcollection 
(extracting features per element) then combining these into groups of 
features and running the next operator on those groups.


Each group contains ~50 elements, so the parallelism of the operator 
upstream of the groupby should be higher, to be balanced with the 
downstream operator.


On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:

Hi Reuven,

It would be better to set parallelism for operators, as I
mentioned before, there may be multiple groupby, join operators in
one pipeline, and their parallelism can be different due to
different input data sizes.

On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:

Jeff - does setting the global default work for you, or do you
need per-operator control? Seems like it would be to add this
to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
 wrote:

Yeah, I don't think we have a good per-operator API for
this. If we were to add it, it probably belongs in
ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
 wrote:

Looking at FlinkPipelineOptions, there is a
parallelism option you can set. I believe this sets
the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
 wrote:

Thanks Holden, this would work for Spark, but
Flink doesn't have such kind of mechanism, so I am
looking for a general solution on the beam side.

On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
 wrote:

To a (small) degree Sparks “new” AQE might be
able to help depending on what kind of
operations Beam is compiling it down to.

Have you tried setting
spark.sql.adaptive.enabled &
spark.sql.adaptive.coalescePartitions.enabled



On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax
via user  wrote:

I see. Robert - what is the story for
parallelism controls on GBK with the Spark
or Flink runners?

On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang
 wrote:

No, I don't use dataflow, I use Spark
& Flink.


On Mon, Apr 17, 2023 at 8:08 AM Reuven
Lax  wrote:

Are you running on the Dataflow
runner? If so, Dataflow - unlike
Spark and Flink - dynamically
modifies the parallelism as the
operator runs, so there is no need
to have such controls. In fact
these specific controls wouldn't
make much sense for the way
Dataflow implements these operators.

On Sun, Apr 16, 2023 at 12:25 AM
Jeff Zhang  wrote:

Just for
performance tuning like in
Spark and Flink.


On Sun, Apr 16, 2023 at
1:10 PM Robert Bradshaw via
user  wrote:

What are you trying to
achieve by setting the
parallelism?

On Sat, Apr 15, 2023 at
5:13 PM Jeff Zhang
 wrote:

Thanks Reuven, what I
mean is to set the