Re: apache beam bigquery IO connector support for bigquery external tables

2023-04-21 Thread Brian Hulette via user
Hi Nirav,
BQ external tables are read-only, so you won't be able to write this way. I
also don't think reading a standard external table will work since the Read
API and tabledata.list are not supported for external tables [1].

BigLake tables [2] on the other hand, may "just work". I haven't
checked this though, and it would still be read-only.

Brian

[1] https://cloud.google.com/bigquery/docs/external-tables#limitations
[2] https://cloud.google.com/bigquery/docs/biglake-intro

On Tue, Apr 18, 2023 at 9:33 AM Nirav Patel  wrote:

> Beam has bigquery IO support to be able to read and write to bq tables. I
> am assuming it only supports bigquery internal tables however if anyone
> knows if it supports writing to and reading from bq external tables
> (parquet or Iceberg) ? I think reading should be possible but haven't tried
> it myself.
>


Re: DeferredDataFrame not saved as feather file. Problem in both DirectRunner and DataFlowRunner

2022-11-10 Thread Brian Hulette via user
On Thu, Nov 10, 2022 at 9:39 AM Duarte Oliveira e Carmo <
duarteoca...@gmail.com> wrote:

> Thanks a lot for the help Brian, and for filling the bug about feather.
>
> Been using parquet and the pipeline is working perfectly.
>
> One extra question (if you’re willing): Is there any way to control the
> amount of sharing on the to_parquet call? (I tried adding n_shards = N, but
> seems to have no effect)
>

Unfortunately not, but this is a good feature request. I filed (another)
issue for this: https://github.com/apache/beam/issues/24094

>
> For extra context, when saving a large parquet file, I wish we could
> control that it saves in 10 files instead of 1000 for example. This would
> make it easier to load back.
>
> Thanks for the help!!
>

Of course!

>
> On 10 Nov 2022 at 18.36.43, Brian Hulette  wrote:
>
>> +user  (adding back the user list)
>>
>> On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette 
>> wrote:
>>
>>> Thanks, glad to hear that worked!
>>>
>>> The feather error looks like a bug, I filed an issue [1] to track it. I
>>> think using parquet instead of feather is the best workaround for now.
>>>
>>> [1] https://github.com/apache/beam/issues/24091
>>>
>>> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo <
>>> duarteoca...@gmail.com> wrote:
>>>
>>>> Hey Brian!
>>>>
>>>> Wow. You are absolutely correct
>>>>
>>>> I hate my brain ahaha.
>>>>
>>>> Ended up indenting the block. Beam was still giving me problems with
>>>>
>>>> ValueError: feather does not support serializing a non-default index for 
>>>> the index; you can .reset_index() to make the index into column(s)
>>>>
>>>>
>>>>
>>>> And  reset_index() is not parallelizable.
>>>>
>>>> So I ended up going with parquet.
>>>>
>>>> Thanks for the help Brian! Really appreciate it 🙂
>>>>
>>>> Will also update stack overflow
>>>>
>>>>
>>>> Duarte O.Carmo
>>>> ML Engineer / Consultant
>>>> duarteocarmo.com
>>>>
>>>>
>>>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> Hi Duarte,
>>>>>
>>>>> I commented on the Stack Overflow question. It looks like your
>>>>> to_dataframe and to_feather calls are outside of the Pipeline context, so
>>>>> they are being created _after_ the pipeline has already run. Hopefully
>>>>> moving them inside the Pipeline context will resolve the issue.
>>>>>
>>>>> Brian
>>>>>
>>>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
>>>>> duarteoca...@gmail.com> wrote:
>>>>>
>>>>>> Hi all!
>>>>>>
>>>>>> *Context: I posted
>>>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
>>>>>> this question before on stack overflow, but hoping to get more answers
>>>>>> here.  *
>>>>>>
>>>>>> I'm trying to compute a sentence-transformers
>>>>>> <https://github.com/UKPLab/sentence-transformers> model for various
>>>>>> rows stored in BigQuery, and then store them in a feather dataframe
>>>>>> in Google Cloud Storage.
>>>>>>
>>>>>> However, I'm having problems in saving the actual dataframe. I'm not
>>>>>> able to save it locally or in Google Cloud Storage, but get no error.
>>>>>>
>>>>>> Here's a reproducible example I've come up with:
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.ml.inference.base import (
>>>>>> ModelHandler,
>>>>>> PredictionResult,
>>>>>> RunInference,
>>>>>> )
>>>>>> from sentence_transformers import SentenceTransformer
>>>>>> import argparse
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>> from typing import Sequence, Optional, Any, Dict, Iterable
>>>>>> from apache_beam.ml.inference.base import KeyedModelHandler
>>>>>> from apache_beam.dataframe.convert import to

Re: DeferredDataFrame not saved as feather file. Problem in both DirectRunner and DataFlowRunner

2022-11-10 Thread Brian Hulette via user
+user  (adding back the user list)

On Thu, Nov 10, 2022 at 9:34 AM Brian Hulette  wrote:

> Thanks, glad to hear that worked!
>
> The feather error looks like a bug, I filed an issue [1] to track it. I
> think using parquet instead of feather is the best workaround for now.
>
> [1] https://github.com/apache/beam/issues/24091
>
> On Thu, Nov 10, 2022 at 1:15 AM Duarte Oliveira e Carmo <
> duarteoca...@gmail.com> wrote:
>
>> Hey Brian!
>>
>> Wow. You are absolutely correct
>>
>> I hate my brain ahaha.
>>
>> Ended up indenting the block. Beam was still giving me problems with
>>
>> ValueError: feather does not support serializing a non-default index for the 
>> index; you can .reset_index() to make the index into column(s)
>>
>>
>>
>> And  reset_index() is not parallelizable.
>>
>> So I ended up going with parquet.
>>
>> Thanks for the help Brian! Really appreciate it 🙂
>>
>> Will also update stack overflow
>>
>>
>> Duarte O.Carmo
>> ML Engineer / Consultant
>> duarteocarmo.com
>>
>>
>> On 10 Nov 2022 at 01.41.29, Brian Hulette via user 
>> wrote:
>>
>>> Hi Duarte,
>>>
>>> I commented on the Stack Overflow question. It looks like your
>>> to_dataframe and to_feather calls are outside of the Pipeline context, so
>>> they are being created _after_ the pipeline has already run. Hopefully
>>> moving them inside the Pipeline context will resolve the issue.
>>>
>>> Brian
>>>
>>> On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
>>> duarteoca...@gmail.com> wrote:
>>>
>>>> Hi all!
>>>>
>>>> *Context: I posted
>>>> <https://stackoverflow.com/questions/74345723/how-to-save-deferreddataframe-to-feather-with-dataflowrunner>
>>>> this question before on stack overflow, but hoping to get more answers
>>>> here.  *
>>>>
>>>> I'm trying to compute a sentence-transformers
>>>> <https://github.com/UKPLab/sentence-transformers> model for various
>>>> rows stored in BigQuery, and then store them in a feather dataframe in
>>>> Google Cloud Storage.
>>>>
>>>> However, I'm having problems in saving the actual dataframe. I'm not
>>>> able to save it locally or in Google Cloud Storage, but get no error.
>>>>
>>>> Here's a reproducible example I've come up with:
>>>>
>>>> import apache_beam as beam
>>>> from apache_beam.ml.inference.base import (
>>>> ModelHandler,
>>>> PredictionResult,
>>>> RunInference,
>>>> )
>>>> from sentence_transformers import SentenceTransformer
>>>> import argparse
>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>> from typing import Sequence, Optional, Any, Dict, Iterable
>>>> from apache_beam.ml.inference.base import KeyedModelHandler
>>>> from apache_beam.dataframe.convert import to_dataframe
>>>>
>>>> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>>>>
>>>>
>>>> class EmbeddingModelHandler(
>>>> ModelHandler[str, PredictionResult, SentenceTransformer]
>>>> ):
>>>> def __init__(self, model_name: str = ENCODING_MODEL_NAME):
>>>> self._model_name = model_name
>>>>
>>>> def load_model(self) -> SentenceTransformer:
>>>> from sentence_transformers import (
>>>> SentenceTransformer,
>>>> )  # <- These imports are needed otherwise GCP complains
>>>> import sentence_transformers
>>>>
>>>> return sentence_transformers.SentenceTransformer(self._model_name)
>>>>
>>>> def run_inference(
>>>> self,
>>>> batch: Sequence[str],
>>>> model: SentenceTransformer,
>>>> inference_args: Optional[Dict[str, Any]] = None,
>>>> ) -> Iterable[PredictionResult]:
>>>> from sentence_transformers import SentenceTransformer
>>>> import sentence_transformers
>>>>
>>>> embedding_matrix = model.encode(
>>>> batch, show_progress_bar=True, normalize_embeddings=True
>>>> )
>>>>
>>>> return embedding_matrix
>&

Re: DeferredDataFrame not saved as feather file. Problem in both DirectRunner and DataFlowRunner

2022-11-09 Thread Brian Hulette via user
Hi Duarte,

I commented on the Stack Overflow question. It looks like your to_dataframe
and to_feather calls are outside of the Pipeline context, so they are being
created _after_ the pipeline has already run. Hopefully moving them inside
the Pipeline context will resolve the issue.

Brian

On Wed, Nov 9, 2022 at 2:20 AM Duarte Oliveira e Carmo <
duarteoca...@gmail.com> wrote:

> Hi all!
>
> *Context: I posted
> 
> this question before on stack overflow, but hoping to get more answers
> here.  *
>
> I'm trying to compute a sentence-transformers
>  model for various rows
> stored in BigQuery, and then store them in a feather dataframe in Google
> Cloud Storage.
>
> However, I'm having problems in saving the actual dataframe. I'm not able
> to save it locally or in Google Cloud Storage, but get no error.
>
> Here's a reproducible example I've come up with:
>
> import apache_beam as beam
> from apache_beam.ml.inference.base import (
> ModelHandler,
> PredictionResult,
> RunInference,
> )
> from sentence_transformers import SentenceTransformer
> import argparse
> from apache_beam.options.pipeline_options import PipelineOptions
> from typing import Sequence, Optional, Any, Dict, Iterable
> from apache_beam.ml.inference.base import KeyedModelHandler
> from apache_beam.dataframe.convert import to_dataframe
>
> ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"
>
>
> class EmbeddingModelHandler(
> ModelHandler[str, PredictionResult, SentenceTransformer]
> ):
> def __init__(self, model_name: str = ENCODING_MODEL_NAME):
> self._model_name = model_name
>
> def load_model(self) -> SentenceTransformer:
> from sentence_transformers import (
> SentenceTransformer,
> )  # <- These imports are needed otherwise GCP complains
> import sentence_transformers
>
> return sentence_transformers.SentenceTransformer(self._model_name)
>
> def run_inference(
> self,
> batch: Sequence[str],
> model: SentenceTransformer,
> inference_args: Optional[Dict[str, Any]] = None,
> ) -> Iterable[PredictionResult]:
> from sentence_transformers import SentenceTransformer
> import sentence_transformers
>
> embedding_matrix = model.encode(
> batch, show_progress_bar=True, normalize_embeddings=True
> )
>
> return embedding_matrix
>
>
> class GetFeatures(beam.DoFn):
> def process(self, element):
> feature = element.get("overview", "")
> iid = element.get("iid")
> return [(iid, feature)]
>
>
> def run(argv=None):
>
> parser = argparse.ArgumentParser()
> parser.add_argument(
> "--output",
> dest="output",
> required=True,
> help="Output file to write results to.",
> )
> known_args, pipeline_args = parser.parse_known_args(argv)
> pipeline_options = PipelineOptions(pipeline_args)
>
> with beam.Pipeline(options=pipeline_options) as pipeline:
>
> embedding_dataframe = (
> pipeline
> | "Read BigQuery"
> >> beam.io.ReadFromBigQuery(
> query="""SELECT text_to_embed,
> identifier
> FROM  [gcp-project:gcp-dataset.gcp-table]
> LIMIT 20
> """,
> project="gcp-project",
> gcs_location="gs://ml-apache-beam/tmp/",
> )
> | "Get features" >> beam.ParDo(GetFeatures())
> | "Run inference"
> >> RunInference(
> KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
> )
> | "To Rows"
> >> beam.Map(
> lambda element: __import__("beam").Row(
> biggint=int(element[0]), embedding=element[1].tolist()
> )
> )
> )
>
> df = to_dataframe(embedding_dataframe)
> df.to_feather(known_args.output)
>
>
> if __name__ == "__main__":
> run()
>
> And my requirements.txt:
>
> sentence-transformers==2.2.2
>
> With python 3.8.14
>
> To run it locally, I use:
>
> python beam_pipeline.py   --requirements_file requirements.txt  --output 
> embedding_output.feather
>
> Which runs fine, but I see no embedding_output.feather in the directory.
>
> And to run it on GCP:
>
> python beam_pipeline.py   --requirements_file requirements.txt  --output 
> "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project 
> my-gcp-project --region us-central1
>
> Also runs fine, but the gs://my-bucket/embedding_output.feather file is
> not there as well.
>
> I’m probably doing something wrong. Would love your thoughts 🙂
>
> Thanks!
>
> Duarte OC
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: [Question] Beam SQL failed with NPE

2022-08-30 Thread Brian Hulette via user
Hi Zheng,
Could you share a minimal example that reproduces the issue?
Also, have you tried using Beam >2.35.0? I'm curious if this happens in the
2.41.0 release as well.

Brian

On Tue, Aug 30, 2022 at 11:10 AM Zheng Ni  wrote:

> Hi There,
>
> I am using beam 2.35.0 to build a simple sql based pipeline but got below
> error. I am trying to debug into the code and could not found this class:
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.BuiltInMetadata.ExplainVisibility.Handler.
> Not sure if any lib missing in my project.
>
> Thanks,
> Zheng
>
>
> POM:
> 
> org.apache.beam
> beam-sdks-java-extensions-sql
> 2.35.0
> 
>
>
> Stack trace:
> Caused by:
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.commons.compiler.InternalCompilerException:
> Compiling "GeneratedMetadata_ExplainVisibilityHandler" in Line 1, Column
> 14: null
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:369)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:231)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.UnitCompiler$1.visitCompilationUnit(UnitCompiler.java:333)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.UnitCompiler$1.visitCompilationUnit(UnitCompiler.java:330)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.Java$CompilationUnit.accept(Java.java:367)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:330)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:245)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:473)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:223)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:209)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.commons.compiler.Cookable.cook(Cookable.java:82)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:361)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:182)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:75)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.calcite.v1_28_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:381)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:108)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.metadata.RelMetadataQuery.isVisibleInExplain(RelMetadataQuery.java:857)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.externalize.RelWriterImpl.explain_(RelWriterImpl.java:67)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:151)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:252)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptUtil.toString(RelOptUtil.java:2409)
> at
> org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptUtil.toString(RelOptUtil.java:2392)
> at
> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:189)
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.

Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Brian Hulette via user
In some places (e.g. in AutoValueSchema) we assume that nested
schema-inferred types are of the same "class". I filed [1] to track this a
while back - I think we should support mixing and matching SchemaProviders
for nested types.

[1] https://github.com/apache/beam/issues/20359

On Thu, Aug 4, 2022 at 2:45 PM Reuven Lax via user 
wrote:

> We do have JavaBeanSchema which might work, depending on whether your
> thrift class conforms to java beans.
>
> On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van  wrote:
>
>> Hi,
>>
>> I have an AutoValue class and it looks like this
>>
>> @AutoValue
>> @DefaultSchema( AutoValueSchema.class )
>> public abstract class MyClass {
>> public abstract String getField1();
>> public abstract MyThriftClass getField2();
>> public static Builder Builder() {
>> return new AutoValue_MyClass.Builder();
>> }
>>
>> @AutoValue.Builder
>> public static abstract class Builder() {
>> public abstract Builder setField1(String field1);
>> public abstract Builder setField2(MyThriftClass field2);
>> public abstract MyClass build();
>> }
>> }
>>
>> MyThriftClass is not an AutoValue class and it inherits from
>> org.apache.thrift.TBase class.
>>
>> When I run a pipeline with a PCollection of elements that are instances
>> of this class, I got this error java.lang.IllegalStateException:
>> AutoValue generated class not found: com.foo.bar.AutoValue_MyThriftClass.
>>
>> My question is, is it possible to use a non-AutoValue member in an
>> AutoValue class like what I am doing now? If yes then how can I do it? If
>> no then what would be the alternatives?
>>
>> Thank you
>>
>> -Binh
>>
>>
>>


Re: [Question] Apache Beam v2.30 breaking change to BigQuery nested arrays of Maps

2022-07-18 Thread Brian Hulette via user
Yeah I think a minimal example would be really helpful. Then at the very
least we should be able to bisect to identify the breaking change. For
creating a test BigQuery database you might look at using the TestBigQuery
rule [1]. There are several usage examples in the Beam repo [2].

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
[2]
https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/datacatalog/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogBigQueryIT.java

On Sun, Jul 17, 2022 at 4:00 AM Jimmy Headdon 
wrote:

> Hello - I've had no joy investigating this further, nor finding the
> specific change in v2.30 that caused this break in behaviour.  I have
> tested with v2.40, to no avail.
>
> Would it be useful if I put together an example project and submitted it
> to this mailing list?  It would need a BigQuery database (or a mock,
> perhaps) to see this in action - any recommendations on how I could achieve
> this?
>
>
> Thanks again
>
> On Thu, 23 Jun 2022 at 13:30, Jimmy Headdon 
> wrote:
>
>> Hello everyone
>>
>> Thanks for your continued efforts on the project and for help with my
>> previous request!  I've run into an issue where upgrading to Beam v2.30 (or
>> any version up to and including v2.39) causes a breaking error when I
>> attempt to write an array of Map objects to Google's BigQuery:
>>
>> "Error while reading data, error message: JSON parsing error in row
>> starting at position 0: Nested arrays not allowed."
>>
>>
>> I'll explain my setup below, but for reference this behaviour worked
>> against Beam's Java library versions 2.19 through 2.29 (I checked them all,
>> and it definitely breaks from v2.30 onwards).
>>
>> *Pipeline Setup*
>> I have a Java pipeline that reads TableRow objects from BigQuery,
>> performs some fairly basic aggregation through Beam SQL and some in-memory
>> transformations, and writes the results back to a different table in
>> BigQuery.
>>
>> The initial BQ TableRow schema is a handful of STRING, INTEGER and
>> BOOLEAN objects, plus a single (nullable) RECORD field.  This RECORD field
>> is the one that causes problems, it is a REPEATED record with simple STRING
>> fields for each instance.
>>
>> When converting the BQ TableRow to Beam Row objects, I'm setting a row
>> schema for the above BQ RECORD as follows (I've also tried a Row field,
>> at the very bottom of this email, for reference):
>>
>> .addNullableField("experiments",
>> FieldType.array(FieldType.map(FieldType.STRING, FieldType.STRING)))
>>
>>
>> I have a custom ParDo function to convert the TableRow objects to Row
>> (at the very bottom of this email, for reference).  It creates a Map
>> instance for every repeated record from BQ, and adds it to an ArrayList which
>> is then added to the new Row through Row.withSchema(x).addValue (I've
>> also tried .addArray).
>>
>> When writing back to BigQuery (I use
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow()) the
>> TableRow is created but then throws the aforementioned "Nested arrays
>> not allowed" error.
>>
>> I'm fairly confident the issue occurs in the initial conversion from
>> TableRow to Row, as I can print row.toString() and the outputs differ.
>> It's worth noting that printing the TableRow before conversion shows no
>> difference between versions.
>>
>> System.out.println(row.toString());
>>
>>
>> Here is the output for this RECORD field with Beam v2.29:
>>
>> [{"name": “forename”, "type": “person”}]
>>
>>
>> And the same input data printed with Beam v2.30 (or higher):
>>
>> [[{"key":"name","value”:”forename”},{“key":"type","value”:”person”}]]
>>
>>
>> It has become a nested structure with Key and Value fields.
>>
>>
>> Nothing obvious stands out in the v2.30 release notes, I'm thinking it
>> could be an upstream library change or possibly an indirect Coder change?
>> Any ideas?
>>
>> Thanks again, let me know if I can provide any other snippets, input
>> data, etc.
>>
>>
>> *TableRow to Row Conversion*
>>
>> public ArrayList>
>> getRecords(ArrayList> records) {
>>
>> if (records == null) {
>>
>> return null;
>>
>> }
>>
>>
>> ArrayList> recordRows = new
>> ArrayList>();
>>
>>
>> for (Map record: records) {
>>
>> Map e = new HashMap();
>>
>> e.put("name", (String)experiment.getOrDefault("name",
>> ""));
>>
>> e.put("type", (String)experiment.getOrDefault("type",
>> ""));
>>
>>
>> recordRows.add(e);
>>
>> }
>>
>>
>> return recordRows;
>>
>> }
>>
>>
>> *Alternate Row Schema*
>>
>> public static Schema recordsRowSchema =
>> Schema.builder()
>>   .addNullableField("name", FieldType.STRING)
>>   .addNullableField("type", FieldType.STRING)
>> 

[ANNOUNCE] Apache Beam 2.37.0 Released

2022-03-09 Thread Brian Hulette
The Apache Beam team is pleased to announce the release of version 2.37.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed on the
Beam blog: https://beam.apache.org/blog/beam-2.37.0/

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.37.0.

-- Brian Hulette, on behalf of The Apache Beam team


Re: Building a Schema from a file

2021-06-18 Thread Brian Hulette
Are the files in some special format that you need to parse and understand?
Or could you opt to store the schemas as proto descriptors or Avro avsc?

On Fri, Jun 18, 2021 at 10:40 AM Matthew Ouyang 
wrote:

> Hello Brian.  Thank you for the clarification request.  I meant the first
> case.  I have files that define field names and types.
>
> On Fri, Jun 18, 2021 at 12:12 PM Brian Hulette 
> wrote:
>
>> Could you clarify what you mean? I could interpret this two different
>> ways:
>> 1) Have a separate file that defines the literal schema (field names and
>> types).
>> 2) Infer a schema from data stored in some file in a structurerd format
>> (e.g csv or parquet).
>>
>> For (1) Reuven's suggestion would work. You could also use an Avro avsc
>> file here, which we also support.
>> For (2) we don't have anything like this in the Java SDK. In the Python
>> SDK the DataFrame API can do this though. When you use one of the pandas
>> sources with the Beam DataFrame API [1] we peek at the file and infer the
>> schema so you don't need to specify it. You'd just need to use
>> to_pcollection to convert the dataframe to a schema-aware PCollection.
>>
>> Brian
>>
>> [1]
>> https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html
>> [2]
>> https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection
>>
>> On Fri, Jun 18, 2021 at 7:50 AM Reuven Lax  wrote:
>>
>>> There is a proto format for Beam schemas. You could define it as a proto
>>> in a file and then parse it.
>>>
>>> On Fri, Jun 18, 2021 at 7:28 AM Matthew Ouyang 
>>> wrote:
>>>
>>>> I was wondering if there were any tools that would allow me to build a
>>>> Beam schema from a file?  I looked for it in the SDK but I couldn't find
>>>> anything that could do it.
>>>>
>>>


Re: Building a Schema from a file

2021-06-18 Thread Brian Hulette
Could you clarify what you mean? I could interpret this two different ways:
1) Have a separate file that defines the literal schema (field names and
types).
2) Infer a schema from data stored in some file in a structurerd format
(e.g csv or parquet).

For (1) Reuven's suggestion would work. You could also use an Avro avsc
file here, which we also support.
For (2) we don't have anything like this in the Java SDK. In the Python SDK
the DataFrame API can do this though. When you use one of the pandas
sources with the Beam DataFrame API [1] we peek at the file and infer the
schema so you don't need to specify it. You'd just need to use
to_pcollection to convert the dataframe to a schema-aware PCollection.

Brian

[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html
[2]
https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection

On Fri, Jun 18, 2021 at 7:50 AM Reuven Lax  wrote:

> There is a proto format for Beam schemas. You could define it as a proto
> in a file and then parse it.
>
> On Fri, Jun 18, 2021 at 7:28 AM Matthew Ouyang 
> wrote:
>
>> I was wondering if there were any tools that would allow me to build a
>> Beam schema from a file?  I looked for it in the SDK but I couldn't find
>> anything that could do it.
>>
>


Re: SqlTransform on windows using direct runner

2021-06-16 Thread Brian Hulette
Ah, this looks like a bug in Beam on Windows. It looks like
send_signal(signal.SIGINT) is not a cross-platform way to close a process.
We should probably use terminate or kill [1] here instead. I opened
BEAM-12501 [2] for this issue.

+dev  for awareness - I think this will affect most
external transforms in Python.

Thanks for letting us know about this Igor

[1]
https://docs.python.org/3/library/subprocess.html#subprocess.Popen.terminate
[2] https://issues.apache.org/jira/browse/BEAM-12501

On Tue, Jun 15, 2021 at 5:19 PM Igor Gois  wrote:

> Hi Brian,
>
> Thank you for your clarification.
>
> Actually, I am only trying to run a simple batch pipeline using the Sql
> transform locally. [1]
>

> The Kafka error didn't happen to me. I only mentioned it because I found
> the same error message on google.
>
> Here is the full error:
> Traceback (most recent call last):
>   File "beam-sql.py", line 18, in 
> |'sql print' >> beam.Map(print)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pvalue.py",
> line 142, in __or__
> return self.pipeline.apply(ptransform, self)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py",
> line 641, in apply
> transform.transform, pvalueish, label or transform.label)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py",
> line 651, in apply
> return self.apply(transform, pvalueish)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py",
> line 694, in apply
> pvalueish_result = self.runner.apply(transform, pvalueish,
> self._options)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\runners\runner.py",
> line 188, in apply
> return m(transform, input, options)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\runners\runner.py",
> line 218, in apply_PTransform
> return transform.expand(input)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py",
> line 304, in expand
> pipeline.local_tempdir)
>   File
> "c:\users\XXX\appdata\local\programs\python\python37\lib\contextlib.py",
> line 119, in __exit__
> next(self.gen)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py",
> line 351, in _service
> yield stub
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py",
> line 503, in __exit__
> self._service_provider.__exit__(*args)
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py",
> line 72, in __exit__
> self.stop()
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py",
> line 131, in stop
> self.stop_process()
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py",
> line 181, in stop_process
> return super(JavaJarServer, self).stop_process()
>   File
> "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py",
> line 141, in stop_process
> self._process.send_signal(signal.SIGINT)
>   File
> "c:\users\XXX\appdata\local\programs\python\python37\lib\subprocess.py",
> line 1306, in send_signal
> raise ValueError("Unsupported signal: {}".format(sig))
> ValueError: Unsupported signal: 2
>
> Thank you again and congratulations for the youtube video. It's very nice!
>
> [1]
> https://stackoverflow.com/questions/67977704/apache-beam-with-sqltransform-in-direct-runner/67990831#67990831
>
> Att,
>
> Igor Gois
>
>
>
>
>
>
> Am Di., 15. Juni 2021 um 19:56 Uhr schrieb Brian Hulette <
> bhule...@google.com>:
>
>> Hi Igor,
>>
>> "Universal Local Runner" is a term we've used in the past for a runner
>> that executes your pipeline locally. It's similar to each SDK's
>> DirectRunner, except that by leveraging portability we should only need one
>> implementation, making it "universal". I don't think we've been using that
>> term recently, I'm sorry I mentioned it in that talk and confused things.
>>
>> The Python DirectRunner is basically the ULR since it is a portable
>> runner. Unfortunatel

Re: SqlTransform on windows using direct runner

2021-06-15 Thread Brian Hulette
Hi Igor,

"Universal Local Runner" is a term we've used in the past for a runner that
executes your pipeline locally. It's similar to each SDK's DirectRunner,
except that by leveraging portability we should only need one
implementation, making it "universal". I don't think we've been using that
term recently, I'm sorry I mentioned it in that talk and confused things.

The Python DirectRunner is basically the ULR since it is a portable runner.
Unfortunately there's one big caveat: Python's portable DirectRunner (also
called FnApiRunner) doesn't support streaming right now. So when you use
the DirectRunner for a streaming Python pipeline, it ends up running on the
Python SDK's non-portable DirectRunner. I suspect that's the issue you're
running into here: SqlTransform and KafkaIO in Python both will only work
on portable runners, but you likely are trying to run a streaming pipeline
if you're using KafkaIO.

It's hard to tell for sure from that error message though, could you share
the full stacktrace?

Brian

On Tue, Jun 15, 2021 at 3:05 PM Igor Gois  wrote:

> Hi,
>
> I am trying to run Sql transform on windows using direct runner and apache
> beam (2.30.0):
>
> import apache_beam as beamfrom apache_beam.transforms.sql import SqlTransform
> with beam.Pipeline() as p:
> pipe = (
> p
> |'hello' >> beam.Create([('SE',400),('SC',500)])
> |'schema' >> beam.Map(lambda x: beam.Row(
> state=x[0],
> population=x[1]
> ))
> )
>
> sql = (
> pipe
> |'sql' >> SqlTransform('SELECT state, population FROM PCOLLECTION')
> |'sql print' >> beam.Map(print)
> )
>
>
> And I got this error:
>
>   File 
> "c:\users\XXX\appdata\local\programs\python\python37\lib\subprocess.py", line 
> 1306, in send_signal
> raise ValueError("Unsupported signal: {}".format(sig))
> ValueError: Unsupported signal: 2
>
>
> I followed this video on youtube [1] and it mentions Universal Local
> Runner (ULR) but I didn't find anything about it on [2]. I also found a
> similar error on [3] but didn't figure out how to solve it and was related
> to kafka.
>
> Can anyone help me?
>
> Thanks in advance
>
> [1] https://youtu.be/zx4p-UNSmrA?t=2097
> [2] https://beam.apache.org/documentation/runners/direct/
> [3]
> https://stackoverflow.com/questions/65780044/readfromkafka-throws-valueerror-unsupported-signal-2
>
>
> Igor Gois
>
>
>
>
>


Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
> One thing that's been on the back burner for a long time is making
CoderProperties into a CoderTester like Guava's EqualityTester.

Reuven's point still applies here though. This issue is not due to a bug in
SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode. I'm
assuming a CoderTester would require manually generating inputs right?
These input Rows represent an illegal state that we wouldn't test with.
(That being said I like the idea of a CoderTester in general)

Brian

On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles  wrote:

> Mutability checking might catch that.
>
> I meant to suggest not putting the check in the pipeline, but offering a
> testing discipline that will catch such issues. One thing that's been on
> the back burner for a long time is making CoderProperties into a
> CoderTester like Guava's EqualityTester. Then it can run through all the
> properties without a user setting up test suites. Downside is that the test
> failure signal gets aggregated.
>
> Kenn
>
> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette  wrote:
>
>> Could the DirectRunner just do an equality check whenever it does an
>> encode/decode? It sounds like it's already effectively performing
>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>> the equality check.
>>
>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>>
>>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>>> the Direct runner already does an encode/decode before that ParDo, then
>>> that would have fixed the problem before we could see it.
>>>
>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>>>
>>>> Would it be caught by CoderProperties?
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>>
>>>>> I don't think this bug is schema specific - we created a Java object
>>>>> that is inconsistent with its encoded form, which could happen to any
>>>>> transform.
>>>>>
>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>> makes it hard to test using PAssert, as I believe that puts everything in 
>>>>> a
>>>>> side input, forcing an encoding/decoding.
>>>>>
>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> +dev 
>>>>>>
>>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>>> fixes the object.
>>>>>>
>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>
>>>>>>> I have some other work-related things I need to do this week, so I
>>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>>> explanation.  It makes perfect sense now.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>>> schema.
>>>>>>>> For example if you have the following schema:
>>>>>>>>
>>>>>>>> Row {
>>>>>>>>field1: Row {
>>>>>>>>   field2: string
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>>>
>>>>>>>> Row {
>>>>>>>>   field1: Row {
>>>>>>>>  renamed: string
>>>>>>>>}
>>>>>>>> }
>>>>>>>>
>>>>>>>> However the Java object for the _nested_ row will return the old
>>>>>>>> schema if getSchema() is called on it. This i

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
Could the DirectRunner just do an equality check whenever it does an
encode/decode? It sounds like it's already effectively performing
a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
the equality check.

On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:

> There is no bug in the Coder itself, so that wouldn't catch it. We could
> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
> the Direct runner already does an encode/decode before that ParDo, then
> that would have fixed the problem before we could see it.
>
> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>
>> Would it be caught by CoderProperties?
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>
>>> I don't think this bug is schema specific - we created a Java object
>>> that is inconsistent with its encoded form, which could happen to any
>>> transform.
>>>
>>> This does seem to be a gap in DirectRunner testing though. It also makes
>>> it hard to test using PAssert, as I believe that puts everything in a side
>>> input, forcing an encoding/decoding.
>>>
>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>> wrote:
>>>
>>>> +dev 
>>>>
>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>> fixes the object.
>>>>
>>>> Do we need better testing of schema-aware (and potentially other
>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>
>>>> Brian
>>>>
>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang 
>>>> wrote:
>>>>
>>>>> I have some other work-related things I need to do this week, so I
>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>> explanation.  It makes perfect sense now.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>>>
>>>>>> Some more context - the problem is that RenameFields outputs (in this
>>>>>> case) Java Row objects that are inconsistent with the actual schema.
>>>>>> For example if you have the following schema:
>>>>>>
>>>>>> Row {
>>>>>>field1: Row {
>>>>>>   field2: string
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>
>>>>>> Row {
>>>>>>   field1: Row {
>>>>>>  renamed: string
>>>>>>}
>>>>>> }
>>>>>>
>>>>>> However the Java object for the _nested_ row will return the old
>>>>>> schema if getSchema() is called on it. This is because we only update the
>>>>>> schema on the top-level row.
>>>>>>
>>>>>> I think this explains why your test works in the direct runner. If
>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>> correct.
>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>> PCollection
>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>>>>> will
>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>> decoding in between, which fixes the object.
>>>>>>
>>>>>> You can validate this theory by forcing a shuffle after RenameFields
>>>>>> using Reshufflle. It should fix the issue If it does, let me know and 
>>>>>> I'll
>>>>>> work on a fix to RenameFields.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 7:39 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Aha, yes this indeed another bug in the transform. The schema is set
>>>>>>> on the top-level Row but not on any nested rows.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 6:37 PM Matthew Ouyang <
>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you everyone for your input.  I believe it will be easiest to
>>>>>>>> respond to 

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Brian Hulette
t;>
>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>>>>
>>>>> This transform is the same across all runners. A few comments on the
>>>>> test:
>>>>>
>>>>>   - Using attachValues directly is error prone (per the comment on the
>>>>> method). I recommend using the withFieldValue builders instead.
>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>> variable of type PCollection and printing out the schema (which you
>>>>> can get using the PCollection.getSchema method) to ensure that the output
>>>>> schema looks like you expect.
>>>>>- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted to
>>>>> flatten, then the better transform would be
>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>
>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>> makes me think that it is buggy in the case where you specify a top-level
>>>>> field multiple times like you do. I think it is simply adding the 
>>>>> top-level
>>>>> field into the output schema multiple times, and the second time is with
>>>>> the field0_1 base name; I have no idea why your test doesn't catch this in
>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>> about
>>>>> this issue and assign it to me?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Matthew,
>>>>>>>
>>>>>>> > The unit tests also seem to be disabled for this as well and so I
>>>>>>> don’t know if the PTransform behaves as expected.
>>>>>>>
>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed 
>>>>>>> with
>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where we
>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run it 
>>>>>>> as
>>>>>>> part of the :runners:direct-java:test task.
>>>>>>>
>>>>>>> That being said, we may only run these tests continuously with the
>>>>>>> DirectRunner, I'm not sure if we test them on all the runners like we do
>>>>>>> with ValidatesRunner tests.
>>>>>>>
>>>>>>
>>>>>> That is correct. The tests are tests _of the transform_ so they run
>>>>>> only on the DirectRunner. They are not tests of the runner, which is only
>>>>>> responsible for correctly implementing Beam's primitives. The transform
>>>>>> should not behave differently on different runners, except for 
>>>>>> fundamental
>>>>>> differences in how they schedule work and checkpoint.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>> > The error message I’m receiving, : Error while reading data,
>>>>>>> error message: JSON parsing error in row starting at position 0: No such
>>>>>>> field: nestedField.field1_0, suggests the BigQuery is trying to use
>>>>>>> the original name for the nested field and not the substitute name.
>>>>>>>
>>>>>>> Is there a stacktrace associated with this error? It would be
>>>>>>> helpful to see where the error is coming from.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/Ren

Re: RenameFields behaves differently in DirectRunner

2021-06-01 Thread Brian Hulette
Hi Matthew,

> The unit tests also seem to be disabled for this as well and so I don’t
know if the PTransform behaves as expected.

The exclusion for NeedsRunner tests is just a quirk in our testing
framework. NeedsRunner indicates that a test suite can't be executed with
the SDK alone, it needs a runner. So that exclusion just makes sure we
don't run the test when we're verifying the SDK by itself in the
:sdks:java:core:test task. The test is still run in other tasks where we
have a runner, most notably in the Java PreCommit [1], where we run it as
part of the :runners:direct-java:test task.

That being said, we may only run these tests continuously with the
DirectRunner, I'm not sure if we test them on all the runners like we do
with ValidatesRunner tests.

> The error message I’m receiving, : Error while reading data, error
message: JSON parsing error in row starting at position 0: No such field:
nestedField.field1_0, suggests the BigQuery is trying to use the original
name for the nested field and not the substitute name.

Is there a stacktrace associated with this error? It would be helpful to
see where the error is coming from.

Brian


[1]
https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/

On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang 
wrote:

> I’m trying to use the RenameFields transform prior to inserting into
> BigQuery on nested fields.  Insertion into BigQuery is successful with
> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>  The error message I’m receiving, : Error while reading data, error
> message: JSON parsing error in row starting at position 0: No such field:
> nestedField.field1_0, suggests the BigQuery is trying to use the original
> name for the nested field and not the substitute name.
>
> The code for RenameFields seems simple enough but does it behave
> differently in different runners?  Will a deep attachValues be necessary in
> order get the nested renames to work across all runners? Is there something
> wrong in my code?
>
>
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>
> The unit tests also seem to be disabled for this as well and so I don’t
> know if the PTransform behaves as expected.
>
>
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>
>
> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>
> package ca.loblaw.cerebro.PipelineControl;
>>
>> import com.google.api.services.bigquery.model.TableReference;
>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>> import org.apache.beam.sdk.Pipeline;
>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>> import org.apache.beam.sdk.schemas.Schema;
>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>> import org.apache.beam.sdk.transforms.Create;
>> import org.apache.beam.sdk.values.Row;
>>
>> import java.io.File;
>> import java.util.Arrays;
>> import java.util.HashSet;
>> import java.util.stream.Collectors;
>>
>> import static java.util.Arrays.*asList*;
>>
>> public class BQRenameFields {
>> public static void main(String[] args) {
>> PipelineOptionsFactory.*register*(DataflowPipelineOptions.class);
>> DataflowPipelineOptions options = PipelineOptionsFactory.
>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>> options.setFilesToStage(
>> Arrays.*stream*(System.*getProperty*("java.class.path").
>> split(File.*pathSeparator*)).
>> map(entry -> (new
>> File(entry)).toString()).collect(Collectors.*toList*()));
>>
>> Pipeline pipeline = Pipeline.*create*(options);
>>
>> Schema nestedSchema = Schema.*builder*().addField(Schema.Field.
>> *nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>> Schema.Field field = Schema.Field.*nullable*("field0_0", Schema.
>> FieldType.*STRING*);
>> Schema.Field nested = Schema.Field.*nullable*("field0_1", Schema.
>> FieldType.*row*(nestedSchema));
>> Schema.Field runner = Schema.Field.*nullable*("field0_2", Schema.
>> FieldType.*STRING*);
>> Schema rowSchema = Schema.*builder*()
>> .addFields(field, nested, runner)
>> .build();
>> Row testRow = Row.*withSchema*(rowSchema).attachValues("value0_0"
>> , Row.*withSchema*(nestedSchema).attachValues("value1_0"), options
>> .getRunner().toString());
>> pipeline
>> .apply(Create.*of*(testRow).withRowSchema(rowSchema))
>> .apply(RenameFields.*create*()
>> .rename("field0_0", "stringField")
>>  

Re: A problem with nexmark build

2021-05-17 Thread Brian Hulette
Hm it looks like there may be a bug in our gradle config, it doesn't seem
to make a shaded jar for use with Spark (see this comment on the PR that
added this to the website [1]). Maybe we need to add a shadowJar
configuration to :sdks:java:testing:nexmark?

+dev  does anyone have context on this?

[1]
https://github.com/apache/beam/commit/2ae7950328cd27330befe0e64688230c83755137#r29690967

On Wed, May 12, 2021 at 4:06 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I have been following this nexmark doc:
> https://beam.apache.org/documentation/sdks/java/testing/nexmark/
>
>
>
> I ran into a problem with “Running query 0 on a Spark cluster with Apache
> Hadoop YARN” section.
>
>
>
> I was following the instruction by running “./gradlew
> :sdks:java:testing:nexmark:assemble” command, but did not find the uber jar
> “build/libs/beam-sdks-java-nexmark-2.29.0-spark.jar” that was built locally
> (the nexmark doc is referencing that jar).
>
>
>
> Can someone provide some guidance and help? Thanks.
>
>
>
>
>


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-05-17 Thread Brian Hulette
What type of files are you reading? If they can be split and read by
multiple workers this might be a good candidate for a Splittable DoFn (SDF).

Brian

On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research 
wrote:

> Hi,
> I am running out of resources on the workers machines.
> The reasons are:
> 1. Every pcollection is a reference to a LARGE file that is copied into
> the worker
> 2. The worker makes calculations on the copied file using a software
> library that consumes memory / storage / compute resources
>
> I have changed the workers' CPUs and memory size. At some point, I am
> running out of resources with this method as well
> I am looking to limit the number of pCollection / elements that are being
> processed in parallel on each worker at a time.
>
> Many thank for any advice,
> Best wishes,
> --
> Eila
> 
> Meetup 
>


Re: DirectRunner, Fusion, and Triggers

2021-05-17 Thread Brian Hulette
> P.S. I need this pipeline to work both on a distributed runner and also
on a local machine with many cores. That's why the performance of
DirectRunner is important to me.

IIUC the DirectRunner has intentionally made some trade-offs to make it
less performant, so that it better verifies pipelines under test. I wonder
if there's another runner you can use that is more performant for local
execution on a machine with many cores, like starting up a local Flink or
Spark cluster?

Brian

On Mon, May 17, 2021 at 7:12 AM Jan Lukavský  wrote:

> On 5/17/21 3:46 PM, Bashir Sadjad wrote:
>
> Thanks Jan. Two points:
>
> - I was running all the experiments I reported with
> `--targetParallelism=1` to make sure concurrent threads do not mess up the
> logs.
>
> I think that is what causes what you see. Try to increase the parallelism
> to number higher than number of input bundles.
>
> - I have been tracking bundles too (see @StartBundle log messages in the
> mini-example in my previous reply to Kenn).
>
> I see the code, but not the log output. My suspicion would be, that you
> see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than
> this is expected - processing of a bundle produces "output bundle", which
> is queued into work queue and is then processed as soon as there is free
> worker to work on it. Fetching new outputs produces new bundles, which are
> also queued to this queue, which is what causes the interleave.
>
>
> So I don't think bundles alone describe what I see. In the mini-example,
> processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT
> bundles are processed, then the output of those go through NEXT, then a few
> other INPUT bundles and so on.
>
> Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the
> input to S2B also has many bundles. However in this case *all* of those
> bundles are processed first, then they all go through the next stages,
> e.g., the logging S2B' that I mentioned. So there is no interleaving of log
> messages.
>
> GBK is a stateful operation that has to wait for a trigger - in simple
> batch case the trigger is the end of input, which is why you cannot see
> outputs of GBK being interleaved with reading inputs. All inputs have had
> to be read before GBK can proceed and output any bundle downstream.
>
>
> Regards,
>
> -B
>
> On Mon, May 17, 2021 at 3:50 AM Jan Lukavský  wrote:
>
>> Hi Bashir,
>>
>> the behavior you describe should be expected. DirectRunner splits the
>> input work into bundles, processing each bundle might result in zero, one
>> or more new bundles. The executor executes the work associated with these
>> bundles, enqueuing new bundles into a queue, until there are no unprocessed
>> bundles left in the queue (that is, the work has been completely done). It
>> uses a fixed-size thread pool to consume and execute work associated with
>> these bundles (the size of which is defined by --targetParallelism), so
>> what happens is that the processing of bundles of "Sleep" transform and
>> "Next" transform are interleaved, but not due to fusion, but due to limited
>> parallelism. If you increase the parallelism beyond the total number of
>> bundles in your `lines` PCollection, then I think you would see the result
>> you expect.
>>
>> Best,
>>
>>  Jan
>> On 5/12/21 7:35 PM, Bashir Sadjad wrote:
>>
>> Thanks Kenn.
>>
>> On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles  wrote:
>>
>>>
>>> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad  wrote:
>>>
 However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which
 only prints some log messages for each record and passes the record to
 output, then it seems S2 and S2' are fused. Because the log messages are
 interleaved with fetches.

>>>
 *Q1*: Does DirectRunner do any fusion optimization (e.g., like
 DataflowRunner)? If not by default, is there any way to enable it?

>>>
>>> The Java DirectRunner does not do any fusion optimization. There's no
>>> code to enable :-). It should affect performance only, not semantics. The
>>> DirectRunner is known to have poor performance, but mostly no one is
>>> working on speeding it up because it is really just for small-scale testing.
>>>
>>
>> Here is a minimal pipeline (with no windowing) that demonstrates what I
>> mean; maybe I am using the wrong terminology but when I run this pipeline
>> with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
>> `DEBUG NEXT` messages are interleaved. While if there was no fusion, I
>> would have expected to see all `DEBUG INPUT` messages first and then all of
>> `DEBUG NEXT`:
>>
>> Pipeline pipeline = Pipeline.create(options);
>> PCollection lines =
>> pipeline.apply(TextIO.read().from(options.getInputFile()));
>>
>> PCollection linesDelayed = lines.apply("Sleep", ParDo.of(new
>> DoFn() {
>>   @StartBundle
>>   public void startBundle() {
>> log.info("INPUT: Started a new bundle");
>>   }
>>   @ProcessElement
>>   public void Proce

Re: Unsubscribe

2021-05-17 Thread Brian Hulette
Hi Tarek, Pasan,

You can unsubscribe by writing to user-unsubscr...@beam.apache.org [1]

[1] https://apache.org/foundation/mailinglists.html#request-addresses-for-
unsubscribing

On Sun, May 16, 2021 at 6:04 AM Pasan Kamburugamuwa <
pasankamburugamu...@gmail.com> wrote:

>
> On Sun, May 16, 2021, 18:33 TAREK ALSALEH 
> wrote:
>
>>
>>
>> Regards,
>> Tarek
>>
>


Re: unsubscribe

2021-05-06 Thread Brian Hulette
Hey Simon, you can unsubscribe by writing to
user-unsubscr...@beam.apache.org [1]

[1]
https://apache.org/foundation/mailinglists.html#request-addresses-for-unsubscribing

On Thu, May 6, 2021 at 2:24 PM Simon Gauld  wrote:

>
>


Re: New Beam Glossary

2021-05-05 Thread Brian Hulette
+user@beam for visibility

Thanks David!

-- Forwarded message -
From: David Huntsperger 
Date: Wed, May 5, 2021 at 10:25 AM
Subject: New Beam Glossary
To: 


Hey all,

We published a new Apache Beam glossary
, to help new users learn
the terminology of the programming model. Feedback welcome!

Thanks,

David


Re: Query regarding support for ROLLUP

2021-05-05 Thread Brian Hulette
+Andrew Pilloud  do you know if this is a bug?

On Tue, May 4, 2021 at 7:38 AM D, Anup (Nokia - IN/Bangalore) <
anu...@nokia.com> wrote:

> Hi All,
>
>
>
> I was trying to use “GROUP BY WITH ROLLUP” (2.29.0 version) which I saw
> here -
> https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/#
>
>
>
> "select warehouse, SUM(quantity) as quantity from PCOLLECTION group by
> ROLLUP(warehouse)"));
>
>
>
> Warehouse | quantity
>
> -
>
> Melbourne |  100
>
> New York|  200
>
> New York| 200
>
>
>
> Output below seems to ignore ROLLUP.
>
>
>
> Warehouse | quantity
>
> ---
>
> Melbourne | 100
>
> New York| 400
>
>
>
> *Warehouse_Total | 500 => not generated*
>
>
>
> Could you please confirm if this is supported or I am missing something.
>
> I tried to search JIRA/documentation to get some pointers but could not
> find.
>
>
>
> Thanks
>
> Anup
>


Re: Question on printing out a PCollection

2021-04-30 Thread Brian Hulette
+Ning Kang  +Sam Rohde 

On Thu, Apr 29, 2021 at 6:13 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> The notebook console from Google Cloud defines a show() API to display a
> PCollection which is very neat:
> https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
>
>
>
> If we are using a regular jupyter notebook to run beam app, how can we
> print out a PCollection easily? What’s the best practice? Thanks!
>
>
>


Re: Any easy way to extract values from PCollection?

2021-04-22 Thread Brian Hulette
I don't think there's an easy answer to this question, in general all you
can do with a PCollection is indicate you'd like to write it out to an IO.
There has been some work in the Python SDK on "Interactive Beam" which is
designed for using the Python SDK interactively in a notebook environment.
It will let you collect() a PCollection - meaning it runs the pipeline and
materializes the result. There's no such capability for the other SDKs
though.

On Wed, Apr 21, 2021 at 8:24 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> This is the question I am asking:
> https://stackoverflow.com/questions/28015924/how-to-extract-contents-from-pcollection-in-cloud-dataflow
>
>
>
> Thanks!
>


Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-20 Thread Brian Hulette
Hi Wenbing,
Sorry for taking so long to get back to you on this.
I discussed this with Robert offline and we came up with a potential
workaround - you could try writing out the Parquet file from within the
groupby.apply method. You can use beam's FileSystems abstraction to open a
Python file object referencing a cloud storage file, and pass that file
object directly to the pandas to_parquet. It would look something like this:

  df.groupby('key1').apply(lambda df:
df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))

If writing out sorted, partitioned parquet files is a common use-case we
should think about making this easier though. At the very least
partition_cols should work, I filed BEAM-12201 [1] for this. That alone
won't be enough as our implementation will likely reshuffle the dataset to
enforce the partitioning, removing any sorting that you've applied, so we'd
also need to think about how to optimize the pipeline to avoid that shuffle.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-12201

On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
wrote:

> Thank you, Brian. I tried `partition_cols`, but it is not working. I tried
> pure pandas, it does work, so I am not sure if anything wrong with Beam.
>
> Wenbing
>
> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette  wrote:
>
>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
>> through [2]. It would be interesting to see what  `partition_cols='key1'`
>> does - I suspect it won't work perfectly though.
>>
>> Do you have any thoughts here Robert?
>>
>> [1]
>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>> [2]
>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>
>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
>> wrote:
>>
>>> Hi Robert and Brian,
>>>
>>> I tried groupby in my case. Here is my pipeline code. I do see all the
>>> data in the final parquet file are sorted in each group. However, I'd like
>>> to write each partition (group) to an individual file, how can I achieve
>>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>>> the pipeline with DataflowRunner considering there is no dataflow worker
>>> image available?
>>>
>>> data = [
>>> {
>>> "key1": 1000 + i % 10,
>>> "key2": randrange(1),
>>> "feature_1": "somestring{}".format(i)
>>> } for i in range(1)
>>> ]
>>>
>>> class TestRow(typing.NamedTuple):
>>> key1: int
>>> key2: int
>>> feature_1: str
>>>
>>> with beam.Pipeline() as p:
>>> pcoll = (
>>> p
>>> | beam.Create(data)
>>> | beam.Map(lambda x:x).with_output_types(TestRow)
>>> )
>>>
>>> df = to_dataframe(pcoll)
>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2'
>>> )
>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>>
>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai 
>>> wrote:
>>>
>>>> Thank you, Robert and Brian.
>>>>
>>>> I'd like to try this out. I am trying to distribute my dataset to
>>>> nodes, sort each partition by some key and then store each partition to its
>>>> own file.
>>>>
>>>> Wenbing
>>>>
>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette 
>>>> wrote:
>>>>
>>>>> Note groupby.apply [1] in particular should be able to do what you
>>>>> want, something like:
>>>>>
>>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>>
>>>>> But as Robert noted we don't make any guarantees about preserving this
>>>>> ordering later in the pipeline. For this reason I actually just sent a PR
>>>>> to disallow sort_values on the entire dataset [2].
>>>>>
>>>>> Brian
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/13843
>>>>> [2] https://github.com/apache/beam/pull/14324
>>>>>
>>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> Thanks for trying this out.
>>>>>>
>>

Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-07 Thread Brian Hulette
Hm, to_parquet does have a `partition_cols` argument [1] which we pass
through [2]. It would be interesting to see what  `partition_cols='key1'`
does - I suspect it won't work perfectly though.

Do you have any thoughts here Robert?

[1]
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
[2]
https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525

On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
wrote:

> Hi Robert and Brian,
>
> I tried groupby in my case. Here is my pipeline code. I do see all the
> data in the final parquet file are sorted in each group. However, I'd like
> to write each partition (group) to an individual file, how can I achieve
> it? In addition, I am using the master of Apache Beam SDK, how can I test
> the pipeline with DataflowRunner considering there is no dataflow worker
> image available?
>
> data = [
> {
> "key1": 1000 + i % 10,
> "key2": randrange(1),
> "feature_1": "somestring{}".format(i)
> } for i in range(1)
> ]
>
> class TestRow(typing.NamedTuple):
> key1: int
> key2: int
> feature_1: str
>
> with beam.Pipeline() as p:
> pcoll = (
> p
> | beam.Create(data)
> | beam.Map(lambda x:x).with_output_types(TestRow)
> )
>
> df = to_dataframe(pcoll)
> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2')
> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>
> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai 
> wrote:
>
>> Thank you, Robert and Brian.
>>
>> I'd like to try this out. I am trying to distribute my dataset to nodes,
>> sort each partition by some key and then store each partition to its own
>> file.
>>
>> Wenbing
>>
>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette  wrote:
>>
>>> Note groupby.apply [1] in particular should be able to do what you want,
>>> something like:
>>>
>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>
>>> But as Robert noted we don't make any guarantees about preserving this
>>> ordering later in the pipeline. For this reason I actually just sent a PR
>>> to disallow sort_values on the entire dataset [2].
>>>
>>> Brian
>>>
>>> [1] https://github.com/apache/beam/pull/13843
>>> [2] https://github.com/apache/beam/pull/14324
>>>
>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw 
>>> wrote:
>>>
>>>> Thanks for trying this out.
>>>>
>>>> Better support for groupby (e.g.
>>>> https://github.com/apache/beam/pull/13843 ,
>>>> https://github.com/apache/beam/pull/13637) will be available in the
>>>> next Beam release (2.29, in progress, but you could try out head if you
>>>> want). Note, however, that Beam PCollections are by definition unordered,
>>>> so unless you sort a partition and immediately do something with it that
>>>> ordering may not be preserved. If you could let us know what you're trying
>>>> to do with this ordering that would be helpful.
>>>>
>>>> - Robert
>>>>
>>>>
>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
>>>> wrote:
>>>>
>>>>> Hi Beam users,
>>>>>
>>>>> I have a user case to partition my PCollection by some key, and then
>>>>> sort my rows within the same partition by some other key.
>>>>>
>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>> figure out how to make it work. Specifically, I tried df.groupby where I
>>>>> expect my data will be distributed to different nodes. I also tried
>>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>>> need.
>>>>>
>>>>> Can someone shed some light on this?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Wenbing Bai
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> Data Infrastructure, Cruise
>>>>>
>>>>> Pronouns: She/Her
>>>>>
>>>>>
>>>>>
>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>> information, confidential material, and trade secrets. This message
>>>>> may contain some or all of those things. Cruise will suffer material harm
>>>>> if anyone other than the intended recipient disseminates or takes any
>>>>> action based on this message. If you have received this message (including
>>>>> any attachments) in error, please delete it immediately and notify the
>>>>> sender promptly.
>>>>
>>>>
>>
>> --
>>
>>
>>
>>
>>
>> Wenbing Bai
>>
>> Senior Software Engineer
>>
>> Data Infrastructure, Cruise
>>
>> Pronouns: She/Her
>>
>>
>
> --
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer
>
> Data Infrastructure, Cruise
>
> Pronouns: She/Her
>
>
>
> *Confidentiality Note:* We care about protecting our proprietary
> information, confidential material, and trade secrets. This message may
> contain some or all of those things. Cruise will suffer material harm if
> anyone other than the intended recipient disseminates or takes any action
> based on this message. If you have received this message (including any
> attachments) in error, please delete it immediately and notify the sender
> promptly.


Re: Beam Dataframe - sort and grouping

2021-04-02 Thread Brian Hulette
Note groupby.apply [1] in particular should be able to do what you want,
something like:

  df.groupby('key1').apply(lambda df: df.sort_values('key2'))

But as Robert noted we don't make any guarantees about preserving this
ordering later in the pipeline. For this reason I actually just sent a PR
to disallow sort_values on the entire dataset [2].

Brian

[1] https://github.com/apache/beam/pull/13843
[2] https://github.com/apache/beam/pull/14324

On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw  wrote:

> Thanks for trying this out.
>
> Better support for groupby (e.g. https://github.com/apache/beam/pull/13843
> , https://github.com/apache/beam/pull/13637) will be available in the
> next Beam release (2.29, in progress, but you could try out head if you
> want). Note, however, that Beam PCollections are by definition unordered,
> so unless you sort a partition and immediately do something with it that
> ordering may not be preserved. If you could let us know what you're trying
> to do with this ordering that would be helpful.
>
> - Robert
>
>
> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
> wrote:
>
>> Hi Beam users,
>>
>> I have a user case to partition my PCollection by some key, and then sort
>> my rows within the same partition by some other key.
>>
>> I feel Beam Dataframe could be a candidate solution, but I cannot figure
>> out how to make it work. Specifically, I tried df.groupby where I expect my
>> data will be distributed to different nodes. I also tried df.sort_values,
>> but it will sort my whole dataset, which is not what I need.
>>
>> Can someone shed some light on this?
>>
>>
>>
>>
>>
>> Wenbing Bai
>>
>> Senior Software Engineer
>>
>> Data Infrastructure, Cruise
>>
>> Pronouns: She/Her
>>
>>
>>
>> *Confidentiality Note:* We care about protecting our proprietary
>> information, confidential material, and trade secrets. This message may
>> contain some or all of those things. Cruise will suffer material harm if
>> anyone other than the intended recipient disseminates or takes any action
>> based on this message. If you have received this message (including any
>> attachments) in error, please delete it immediately and notify the sender
>> promptly.
>
>


Re: How to Parallelize Google Cloud Storage Blob Downloads with Grouping

2021-03-31 Thread Brian Hulette
I'm not very familiar with SDF so I can't comment on that approach.
Maybe +Boyuan
Zhang  would be helpful there.

What if FileIO could yield the glob that was matched along with each file?
Then you could use that as a grouping key later on.

Brian

On Wed, Mar 24, 2021 at 7:08 AM Evan Galpin  wrote:

> Hi all, I’m looking for some expertise [image: :slightly_smiling_face:] I
> realize I may not be using things as intended, and I welcome any feedback.
> I’m using the Java SDK, v2.28.0 and using both the Direct Runner (for
> development) and Dataflow Runner (for production). My pipeline is a
> streaming pipeline with Google PubSub as the source.
>
> TL;DR I’d like to be able to maintain a grouping of entities within a
> single PCollection element, but parallelize the fetching of those entities
> from Google Cloud Storage (GCS). PCollection> -->
> PCollection> where the starting PCollection is an
> Iterable of file paths and the resulting PCollection is Iterable of file
> contents. Alternatively, PCollection -->
> PCollection> would also work and perhaps even be
> preferable, where the starting PCollection is a glob pattern, and the
> resulting PCollection is an iterable of file contents which matched the
> glob.
>
> My use-case is that at a point in my pipeline I have as input
> PCollection. Each element of the PCollection is a GCS filepath
> glob pattern. It’s important that files which match the glob be grouped
> together because the content of the files–once *all* files in a group are
> read–need to be grouped downstream in the pipeline. I originally tried using
> FileIO.matchAll  and a subsequently GroupByKey . However, the matchAll,
> window, and GroupByKey combination lacked any guarantee that all files
> matching the glob would be read and in the same window before performing
> the GroupByKey transform. It’s possible to achieve the desired results if
> a large WindowFn is applied, but it’s still probabilistic rather than a
> guarantee that all files will be read before grouping. It’s also the main
> goal of my pipeline to maintain the lowest possible latency.
>
> So my next, and currently operational, plan was to use an AsyncHttpClient
> to fan out fetching file contents via GCS HTTP API. I feel like this goes
> against the grain in Beam and is likely quite suboptimal in terms of
> parallelization.
>
> So I’ve started investigating SplittableDoFn . My current plan is to
> allow splitting such that each entity in the input Iterable (i.e. each
> matched file from the glob pattern) could be processed separately. The
> challenge I’ve encountered is: how do I go about grouping/gathering the
> split invocations back into a single output value in my DoFn? I’ve tried
> using stateful processing and using a BagState to collect file contents
> along the way, but I realized part way along that the ProcessElement method
> of a splittable DoFn may only accept ProcessContext and Restriction tuples,
> and no other args therefore no StateId args referring to a StateSpec.
>
> I noticed in the FilePatternWatcher example in the official SDF proposal
> doc[1] that a custom tracker was created wherein FilePath Objects kept in
> a set and presumably added to the set via tryClaim. This seems as though
> it could work for my use-case, but I don’t see/understand how to go about
> implementing a @SplitRestriction method using a custom RestrictionTracker.
>
> I would be very appreciative if anyone were able to offer advice. I have
> no preference for any particular solution, only that I want to achieve the
> ability to maintain a grouping of entities within a single PCollection
> element, but parallelize the fetching of those entities from Google Cloud
> Storage (GCS).
>
> Thanks!
> Evan
>
> [1] (
> https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.19qhdetat7d9
> )
>


Re: Dataflow - Kafka error

2021-03-30 Thread Brian Hulette
+Chamikara Jayalath 

Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may have
addressed the issue. It avoids the problematic code when the pipeline is
multi-language [2].

[1] https://github.com/apache/beam/pull/13536
[2]
https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524

On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu 
wrote:

> I'm trying to write to a Kafka topic using WriteTokafka module from
> apache_beam.io.kafka.
> The error I get is:
>
>> File "predict.py", line 162, in 
>> run()
>> File "predict.py", line 158, in run
>> topic = 'int.fitbit_explore.video_recommendations'))
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 580, in __exit__
>> self.result = self.run()
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 530, in run
>> self._options).run(False)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 902, in from_runner_api
>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1229, in from_runner_api
>> transform = ptransform.PTransform.from_runner_api(proto, context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>> line 733, in from_runner_api
>> context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>> line 1420, in from_runner_api_parameter
>> pardo_payload.do_fn, context).serialized_dofn_data())
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>> line 1493, in from_runner_api
>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>
>
> The pipeline looks like this:
>
>> pipeline_options = PipelineOptions(argv)
>> with beam.Pipeline(options=pipeline_options) as p:
>> _ = (p | 'Create' >> beam.Create(['Start'])
>> | 'Read MDAU' >>
>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>> | 'Predict' >> beam.ParDo(PredictDoFn())
>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' :
>> ':9092'},
>> topic = ''))
>
>
> I replaced the bootstrap server and topic values with placeholders here
> because I'm not sure if I should show them or not.
>
> The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
> and it looks like this:
>
> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
> video = Vid

Re: [Question] Need to write a pipeline in Go consuming events from Kafka

2021-03-29 Thread Brian Hulette
+Robert Burke  any advice here?

On Wed, Mar 24, 2021 at 4:24 AM Đức Trần Tiến 
wrote:

> Hi,
>
> I am very very new to Go and Apache Beam too! This is my situation:
>  - I have a kafka running
>  - I want to write an etl pipeline that consuming data from the kafka in Go
>
> Because there is no Kafka support in the Go SDK, only Java SDK. I had been
> looking for a way to create an unbounded collection in Go then do some
> coding to consume the kafka, but no result! *beam.Create()* only supports
> to create a finite data source.
>
> Could you please guide me or give me some ideas to make progress? Either
> creating an unbounded collection or a straight way to create an etl
> pipeline consuming kafka.
>
> And the last question: Could I write that pipeline in Java and invoke that
> pipeline from Go? :D
>
> Thanks and regards,
>
> Duc Tran
>


Re: JdbcIO SQL best practice

2021-03-23 Thread Brian Hulette
FYI the schemas option has been pursued a little bit in
JdbcSchemaIOProvider [1], which naively generates SELECT and INSERT
statements for reads and writes. Practically, this code is only usable from
SQL, and multi-language pipelines (e.g. it's accessible from the python SDK
[2]). We could consider either:
- Moving this logic into JdbcIO and re-using it in JdbcSchemaIOProvider, or
- Adding a user-friendly interface to SchemaIOProvider implementations in
the Java SDK

Brian

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
[2]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/jdbc.py

On Tue, Mar 23, 2021 at 12:03 AM Thomas Fredriksen(External) <
thomas.fredrik...@cognite.com> wrote:

> That is a very good question.
>
> Personally, I would prefer that read and write were simplified. I guess
> there will always be a need for writing complex queries, but the vast
> majority of pipelines will only need to read or write data to or from a
> table. As such, having read/write functions that will take an input-class
> (BEAN or POJO for example) and simply generate the required write-statement
> would be sufficient. Upserts should also be a part of this.
>
> For example:
>
> ```
> PCollection collection = ...;
> collection.apply("Write to database", JdbcIO.writeTable(MyBean.class)
> .withDataSourceConfiguration(mySourceConfiguration)
> .withTableName(myTableName)
> .withUpsertOption(UpsertOption.create()
> .withConflictTarget(keyColumn)
> .withDoUpdate());
> ```
> This would of course assume that the columns of `myTableName` would match
> the members of `MyBean`.
>
> There are of course technical challenges with this:
> * How to handle situations where the column names do not match the
> input-type
> * How to detect columns from the input-type.
>
> As an alternative, schemas may be an option:
>
> ```
> PCollection collection = ...;
> collection.apply("Write to database", JdbcIO.writeRows()
> .withSchema(mySchema)
> .withDataSourceConfiguration(mySourceConfiguration)
> .withTableName(myTableName)
> .withUpsertOption(UpsertOption.create()
> .withConflictTarget(keyColumn)
> .withDoUpdate());
> ```
> This would allow for greater flexibility, but we lose the type-strong
> nature of first suggestion.
>
> I hope this helps.
>
> Best Regards
> Thomas Li Fredriksen
>
> On Fri, Mar 19, 2021 at 7:17 PM Alexey Romanenko 
> wrote:
>
>> Hmm, interesting question. Since we don’t have any answers yet may I ask
>> you a question - do you have an example of what like this could be these
>> practises or how it can be simplified?
>>
>>
>> PS: Not sure that it can help but JdbcIO allows to set a query with
>> “ValueProvider” option which can be helpful to parametrise your transform
>> with values that are only available during pipeline execution and can be
>> used for pipeline templates [1].
>>
>> [1]
>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
>>
>> > On 17 Mar 2021, at 14:06, Thomas Fredriksen(External) <
>> thomas.fredrik...@cognite.com> wrote:
>> >
>> > Hello everyone,
>> >
>> > I was wondering what is considered best-practice when writing SQL
>> statements for the JdbcIO connector?
>> >
>> > Hand-writing the statements and subsequent preparedStatementSetter
>> causes a lot of bloat and is not very manageable.
>> >
>> > Thank you/
>> >
>> > Best Regards
>> > Thomas Li Fredriksen
>>
>>


Re: Ref needed for apache beam SQL

2021-03-08 Thread Brian Hulette
The Python SqlTransform was *first* added in Beam 2.22.0, I'd recommend
using at least Beam 2.24.0 though. What version of Beam Python are you
using?

What happens when you try to import it? Is there an error message you can
share?

Brian

On Sun, Mar 7, 2021 at 7:59 PM Thirusenthilkumar 
wrote:

> Dear Apache Beam experts,
>
> Kindly help me to use SQL in apache beam with python SDK.
>
> Best Regards
> Thirusenthilkumar P
>
>
>
> On Saturday, 6 March 2021, 01:04:14 GMT+5:30, Brian Hulette <
> bhule...@apache.org> wrote:
>
>
> Hi Thirusenthilkumar P,
> I assume you're interested in using SQL from the Beam Python SDK? In that
> case you'd just need to import SqlTransform, as seen in this example:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py#L39
> Note it relies on the Java SDK, so you will need to have Java installed on
> your machine. For future reference, you may be able to get faster support
> on the user@beam.apache.org mailing list.
>
> Brian
>
> On Thu, Mar 4, 2021 at 7:26 PM Thirusenthilkumar 
> wrote:
>
> Dear Brain,
>
> I was going through one of your video and it is interesting. I have couple
> of beam pipelined with python SDK currently and want to implement the beam
> SQL for some transformations. can you please let me know how to import beam
> SQL?
>
> https://2020.beamsummit.org/sessions/simpler-python-pipelines/
>
> Best Regards
> Thirusenthilkumar P
>
>


Re: A problem with ZetaSQL

2021-03-04 Thread Brian Hulette
Ah, I suspect this is because our ZetaSQL planner only supports 64 bit
integers (see
https://beam.apache.org/documentation/dsls/sql/zetasql/data-types/#integer-type
). +Robin Qiu  maybe we should have a better error
message for this?

On Thu, Mar 4, 2021 at 5:24 PM Tao Li  wrote:

> Brian the schema is really simple. Just 3 primitive type columns:
>
>
>
> root
>
> |-- column_1: integer (nullable = true)
>
> |-- column_2: integer (nullable = true)
>
> |-- column_3: string (nullable = true)
>
>
>
>
>
> *From: *Brian Hulette 
> *Date: *Thursday, March 4, 2021 at 2:29 PM
> *To: *Tao Li 
> *Cc: *"user@beam.apache.org" 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks, It would also be helpful to know what avroSchema is, or at least
> the types of its fields, so we can understand what the schema of the
> PCollection is.
>
>
>
> On Tue, Mar 2, 2021 at 11:00 AM Tao Li  wrote:
>
> Hi Brian,
>
>
>
> Here is my code to create the PCollection.
>
>
>
> PCollection files = pipeline
>
> .apply(FileIO.match().filepattern(path))
>
> .apply(FileIO.readMatches());
>
>
>
> PCollection input =  files
>
> .apply(ParquetIO.readFiles(avroSchema))
>
> .apply(MapElements
>
> .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema
>
> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, March 2, 2021 at 10:31 AM
> *To: *user 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks for reporting this Tao - could you share what the type of your
> input PCollection is?
>
>
>
> On Tue, Mar 2, 2021 at 9:33 AM Tao Li  wrote:
>
> Hi all,
>
>
>
> I was following the instructions from this doc to play with ZetaSQL
> https://beam.apache.org/documentation/dsls/sql/overview/
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7C44e3c1a4455172a108d8df5d0428%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504937882864479%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=RAXCN9Fbze5N41n35EkgY%2BkNn7pvN1Exib6%2BUr7Df3k%3D&reserved=0>
>
>
>
> The query is really simple:
>
>
>
> options.as
> <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Foptions.as%2F&data=04%7C01%7Ctaol%40zillow.com%7C44e3c1a4455172a108d8df5d0428%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504937882864479%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdcvMpWl%2FfmUhxlIu7igK1yTRMDWgIpA7bV2yKYlInU%3D&reserved=0>
> (BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
>
> input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))
>
>
>
> I am seeing this error with ZetaSQL  :
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unknown Calcite type: INTEGER
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
>
> at
> com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>
>
>
> This query works fine when using Calcite (by just removing setPlannerName
> call). Am I missing anything here? For example I am specifying
> 'com.google.guava:guava:23.0' as the dependency.
>
>
>
> Thanks!
>
>
>
>
>
>


Re: A problem with ZetaSQL

2021-03-04 Thread Brian Hulette
Thanks, It would also be helpful to know what avroSchema is, or at least
the types of its fields, so we can understand what the schema of the
PCollection is.

On Tue, Mar 2, 2021 at 11:00 AM Tao Li  wrote:

> Hi Brian,
>
>
>
> Here is my code to create the PCollection.
>
>
>
> PCollection files = pipeline
>
> .apply(FileIO.match().filepattern(path))
>
> .apply(FileIO.readMatches());
>
>
>
> PCollection input =  files
>
> .apply(ParquetIO.readFiles(avroSchema))
>
> .apply(MapElements
>
> .into(TypeDescriptors.rows())
>
>
> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema
>
> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>
>
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Tuesday, March 2, 2021 at 10:31 AM
> *To: *user 
> *Subject: *Re: A problem with ZetaSQL
>
>
>
> Thanks for reporting this Tao - could you share what the type of your
> input PCollection is?
>
>
>
> On Tue, Mar 2, 2021 at 9:33 AM Tao Li  wrote:
>
> Hi all,
>
>
>
> I was following the instructions from this doc to play with ZetaSQL
> https://beam.apache.org/documentation/dsls/sql/overview/
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cde9c6a92756146a41b8308d8dda95de7%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637503066785410226%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=jv7rLyLR5pHlokEv1Ngnglfp%2Fvw6Ui5Mzn%2BfvJ4B104%3D&reserved=0>
>
>
>
> The query is really simple:
>
>
>
> options.as
> <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Foptions.as%2F&data=04%7C01%7Ctaol%40zillow.com%7Cde9c6a92756146a41b8308d8dda95de7%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637503066785410226%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=q0epUsinWTFpWWJ%2BjjtAFw5RRasgT2ivm5%2FG%2FrXU1Hg%3D&reserved=0>
> (BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
>
> input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))
>
>
>
> I am seeing this error with ZetaSQL  :
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unknown Calcite type: INTEGER
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
>
> at
> com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>
>
>
> This query works fine when using Calcite (by just removing setPlannerName
> call). Am I missing anything here? For example I am specifying
> 'com.google.guava:guava:23.0' as the dependency.
>
>
>
> Thanks!
>
>
>
>
>
>


Re: A problem with ZetaSQL

2021-03-02 Thread Brian Hulette
Thanks for reporting this Tao - could you share what the type of your input
PCollection is?

On Tue, Mar 2, 2021 at 9:33 AM Tao Li  wrote:

> Hi all,
>
>
>
> I was following the instructions from this doc to play with ZetaSQL
> https://beam.apache.org/documentation/dsls/sql/overview/
>
>
>
> The query is really simple:
>
>
>
> options.as
> (BeamSqlPipelineOptions.class).setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner")
>
> input.apply(SqlTransform.query("SELECT * from PCOLLECTION"))
>
>
>
> I am seeing this error with ZetaSQL  :
>
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unknown Calcite type: INTEGER
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlCalciteTranslationUtils.toZetaSqlType(ZetaSqlCalciteTranslationUtils.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addFieldsToTable(SqlAnalyzer.java:359)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.addTableToLeafCatalog(SqlAnalyzer.java:350)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.lambda$createPopulatedCatalog$1(SqlAnalyzer.java:225)
>
> at
> com.google.common.collect.ImmutableList.forEach(ImmutableList.java:406)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.createPopulatedCatalog(SqlAnalyzer.java:225)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:102)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:180)
>
> at
> org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:168)
>
> at
> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:114)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:140)
>
> at
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>
>
>
> This query works fine when using Calcite (by just removing setPlannerName
> call). Am I missing anything here? For example I am specifying
> 'com.google.guava:guava:23.0' as the dependency.
>
>
>
> Thanks!
>
>
>
>
>


Re: Potential bug with BEAM-11460?

2021-03-01 Thread Brian Hulette
I think this is really just a case of an unanticipated use-case. What you
did in https://github.com/apache/beam/pull/13616 is reasonable, you tested
the paths that you thought users would need.
This situation is just a little different because the solution was to add a
"withCoder" method, which was inadvertently mentioned in your error message
due to a copy-paste error.

Brian

On Sun, Feb 28, 2021 at 5:28 AM Anant Damle  wrote:

> Thanks Tao!
> @Brian Hulette  & @Tao Li: I would be curious to
> learn how can one test these kinds of PRs where one is only providing some
> plumbing.
>
> On Sat, Feb 27, 2021 at 1:05 AM Tao Li  wrote:
>
>> Thanks @Anant Damle  for fixing the issue with
>> BEAM-11460 and BEAM-11527 so quickly!
>>
>>
>>
>> *From: *Anant Damle 
>> *Date: *Friday, February 26, 2021 at 6:49 AM
>> *To: *Tao Li 
>> *Cc: *"user@beam.apache.org" , Brian Hulette <
>> bhule...@google.com>
>> *Subject: *Re: Potential bug with BEAM-11460?
>>
>>
>>
>> @Tao Li, I have added the Unit Test for your use-case and in this commit
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F14078%2Fcommits%2Ff5459bb3533194de48712229957a555ef79f17ef&data=04%7C01%7Ctaol%40zillow.com%7C9108a61d8cf34535ea4708d8da65b2ce%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637499477623820239%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Weg3VhsVq45SYPdYxJc0EVjCXBlvQJOMvmveqxKYIRY%3D&reserved=0>
>> .
>>
>>
>>
>> On Fri, Feb 26, 2021 at 10:13 PM Anant Damle  wrote:
>>
>> Thanks Tao,
>>
>> Let me try and put this as a test-case.
>>
>> I am also looking into BEAM-11527
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11527&data=04%7C01%7Ctaol%40zillow.com%7C9108a61d8cf34535ea4708d8da65b2ce%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637499477623820239%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2BC88BEp985JnhebqzzRmEaiWuhn5MmVOWnn9DqfgcLQ%3D&reserved=0>
>> .
>>
>>
>>
>> Thanks,
>>
>> Anant
>>
>>
>>
>> On Fri, Feb 26, 2021 at 9:30 AM Tao Li  wrote:
>>
>> @Brian Hulette  I think the main issue I am trying
>> to reporting is that I see this error message “Specify it explicitly using
>> withCoder().” But I did not find withCoder() API available from ParquetIO.
>> So maybe we need to add that method.
>>
>> Getting back to your ask, here is roughly the code I was running. Hope
>> this helps.
>>
>> PCollection inputDataTest =
>> pipeline.apply(ParquetIO.parseGenericRecords(new
>> SerializableFunction() {
>>
>> public Row apply(GenericRecord record) {
>>
>> return AvroUtils.toBeamRowStrict(record,
>> null);
>>
>> }
>>
>> })
>>
>> .from(path));
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From: *Brian Hulette 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Thursday, February 25, 2021 at 3:11 PM
>> *To: *Anant Damle 
>> *Cc: *user 
>> *Subject: *Re: Potential bug with BEAM-11460?
>>
>>
>>
>> Hi Tao,
>> Thanks for reporting this! Could you share more details about your
>> use-case, Anant mentioned that he's having trouble coming up with a test
>> case where inferCoder doesn't work [1].
>>
>>
>>
>> Brian
>>
>> [1] https://github.com/apache/beam/pull/14078#issuecomment-786293576
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F14078%23issuecomment-786293576&data=04%7C01%7Ctaol%40zillow.com%7C9108a61d8cf34535ea4708d8da65b2ce%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637499477623830201%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=jCJzq6jPAzlgIAqbUMaRIBVHeMuXZG4450fNsCpb61c%3D&reserved=0>
>>
>>
>>
>> On Wed, Feb 24, 2021 at 6:49 PM Anant Damle  wrote:
>>
>> Hi Brian,
>>
>> I think you are right. Create BEAM-11861
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11861&data=04%7C01%7Ctaol%40zillow.com%7C9108a61d8cf34535ea4708d8da65b2ce%7C033464830d

Re: Potential bug with BEAM-11460?

2021-02-25 Thread Brian Hulette
Hi Tao,
Thanks for reporting this! Could you share more details about your
use-case, Anant mentioned that he's having trouble coming up with a test
case where inferCoder doesn't work [1].

Brian

[1] https://github.com/apache/beam/pull/14078#issuecomment-786293576

On Wed, Feb 24, 2021 at 6:49 PM Anant Damle  wrote:

> Hi Brian,
> I think you are right. Create BEAM-11861
> <https://issues.apache.org/jira/browse/BEAM-11861>, will send a PR today.
> Present workaround is to provide .setCoder directly on the Output
> PCollection.
>
> On Thu, Feb 25, 2021 at 5:25 AM Brian Hulette  wrote:
>
>> +Anant Damle  is this an oversight in
>> https://github.com/apache/beam/pull/13616? What would be the right way
>> to fix this?
>>
>> On Tue, Feb 23, 2021 at 5:24 PM Tao Li  wrote:
>>
>>> Hi Beam community,
>>>
>>>
>>>
>>> I cannot log into Beam jira so I am asking this question here. I am
>>> testing this new feature from Beam 2.28 and see below error:
>>>
>>>
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Unable to
>>> infer coder for output of parseFn. Specify it explicitly using withCoder().
>>>
>>> at
>>> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.inferCoder(ParquetIO.java:554)
>>>
>>> at
>>> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.expand(ParquetIO.java:521)
>>>
>>> at
>>> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.expand(ParquetIO.java:483)
>>>
>>> at
>>> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
>>>
>>>
>>>
>>> However ParquetIO builder does not have this withCoder() method. I think
>>> this error message is mimicking AvroIO:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1010
>>>
>>>
>>>
>>> Should we add this method to ParquetIO? Thanks!
>>>
>>


Re: Potential bug with BEAM-11460?

2021-02-24 Thread Brian Hulette
+Anant Damle  is this an oversight in
https://github.com/apache/beam/pull/13616? What would be the right way to
fix this?

On Tue, Feb 23, 2021 at 5:24 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I cannot log into Beam jira so I am asking this question here. I am
> testing this new feature from Beam 2.28 and see below error:
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException: Unable to
> infer coder for output of parseFn. Specify it explicitly using withCoder().
>
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.inferCoder(ParquetIO.java:554)
>
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.expand(ParquetIO.java:521)
>
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ParseFiles.expand(ParquetIO.java:483)
>
> at
> org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
>
>
>
> However ParquetIO builder does not have this withCoder() method. I think
> this error message is mimicking AvroIO:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L1010
>
>
>
> Should we add this method to ParquetIO? Thanks!
>


Re: Unit Testing Kafka in Apache Beam

2021-02-09 Thread Brian Hulette
Ah this is getting trickier :)

So in the Cloud PubSub tests that I'm referring to we don't use PAssert, we
instead just inject some messages to one PubSub topic, and write some
output to another PubSub topic that we can then verify. We're also not
doing any verification of event time semantics.
This approach might work for you, but you won't have any direct control
over processing time which I'd imagine could lead to flakiness.

I think the right thing to do here would be to verify the event time
semantics/allowed lateness using TestStream to circumvent the KafkaIO. If
you want an end-to-end test including the KafkaIO, you might just add
another test that verifies the full pipeline with a simpler input.

Brian

On Tue, Feb 9, 2021 at 9:15 AM Rion Williams  wrote:

> Hey Brian,
>
> So that’s an approach that was thinking about last night and I’m sure it
> could work (I.e. running the pipeline and shortly after sending through
> messages). I’m assuming that as long as the pipeline is running, I should
> be able to apply a PAssert against it to verify that something happened
> after injecting the data and passing/failing the test.
>
> Another question that seems to be baffling me is - is it possible to
> simulate processing time advances using Kafka in a scenario like this? I
> know that I can use a TestStream with its “advanceProcessingTime()”
> constructs to simulate this in a stream, but doing so in a test scenario
> would circumvent Kafka entirely which seems to defeat the purpose of the
> test.
>
> Ideally what I’d want to accomplish is:
> - define a series of records with appropriate timestamps to send through
> my pipeline
> - window against those, verifying allowed lateness is applied/messages
> dropped accordingly
> - assert against the output of the windows
>
> Thanks so much for the response, I do appreciate it.
>
> On Feb 9, 2021, at 11:08 AM, Brian Hulette  wrote:
>
> 
> Hi Rion,
>
> Can you run the pipeline asynchronously and inject messages after it has
> started? We use this approach for some tests against Cloud PubSub.
> Note if using the DirectRunner you need to set the blockOnRun pipeline
> option to False to do this.
>
> Brian
>
> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams 
> wrote:
>
>> Hey folks,
>>
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals
>> with some out of order data (I.e. mismatching processing times /
>> event-times) and does quite a bit of windowing depending on the data. Most
>> of the work I‘ve done in a lot of streaming systems relies heavily on
>> well-written tests, both unit and integration. Beam’s existing constructs
>> like TestStreams are incredible, however I’ve been trying to more closely
>> mimic a production pipeline.
>>
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can
>> be consumed by the KafkaIO source as expected. This works great, however it
>> requires that the data is injected into the topic prior to starting the
>> pipeline:
>>
>> ```
>> injectIntoTopic(...)
>>
>> pipeline
>> .apply(
>> KafkaIO.read(...)
>> )
>>
>> pipeline.run()
>> ```
>>
>> This results in all of the data being consumed at once and thus not
>> treating allowed lateness like I would imagine. So my question is this:
>>
>> Is it possible to create a unit test to send records to a Kafka topic
>> with some given (even artificial) delay so that I could verify things like
>> allowed lateness within a pipeline?
>>
>> Essentially like using a TestStream with all of its notions of
>> “advanceProcessingTime” in conjunction with inserting those delayed records
>> into Kafka.
>>
>> Does that make sense? Or should I just rely on the use of a TestStream
>> and circumvent Kafka entirely? It doesn’t seem like the right thing to do,
>> but I don’t currently see a way to work around this (I’ve tried defining
>> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>>
>> Any advice would be appreciated!
>>
>> [1] : https://github.com/salesforce/kafka-junit
>>
>


Re: Unit Testing Kafka in Apache Beam

2021-02-09 Thread Brian Hulette
Hi Rion,

Can you run the pipeline asynchronously and inject messages after it has
started? We use this approach for some tests against Cloud PubSub.
Note if using the DirectRunner you need to set the blockOnRun pipeline
option to False to do this.

Brian

On Mon, Feb 8, 2021 at 2:10 PM Rion Williams  wrote:

> Hey folks,
>
> I’ve been working on fleshing out a proof-of-concept pipeline that deals
> with some out of order data (I.e. mismatching processing times /
> event-times) and does quite a bit of windowing depending on the data. Most
> of the work I‘ve done in a lot of streaming systems relies heavily on
> well-written tests, both unit and integration. Beam’s existing constructs
> like TestStreams are incredible, however I’ve been trying to more closely
> mimic a production pipeline.
>
> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be
> consumed by the KafkaIO source as expected. This works great, however it
> requires that the data is injected into the topic prior to starting the
> pipeline:
>
> ```
> injectIntoTopic(...)
>
> pipeline
> .apply(
> KafkaIO.read(...)
> )
>
> pipeline.run()
> ```
>
> This results in all of the data being consumed at once and thus not
> treating allowed lateness like I would imagine. So my question is this:
>
> Is it possible to create a unit test to send records to a Kafka topic with
> some given (even artificial) delay so that I could verify things like
> allowed lateness within a pipeline?
>
> Essentially like using a TestStream with all of its notions of
> “advanceProcessingTime” in conjunction with inserting those delayed records
> into Kafka.
>
> Does that make sense? Or should I just rely on the use of a TestStream and
> circumvent Kafka entirely? It doesn’t seem like the right thing to do, but
> I don’t currently see a way to work around this (I’ve tried defining
> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>
> Any advice would be appreciated!
>
> [1] : https://github.com/salesforce/kafka-junit
>


Re: Using the Beam Python SDK and PortableRunner with Flink to connect to Kafka with SSL

2021-02-02 Thread Brian Hulette
Hm I would expect that to work. Can you tell what container Flink is using
if it's not using the one you specified? +Chamikara Jayalath
 may have some insight here

Brian

On Tue, Feb 2, 2021 at 3:27 AM Paul Nimusiima  wrote:

> Hello Beam community,
>
> I am wondering whether it is possible to connect to a secure Kafka broker
> with the python beam SDK. So for example, in the code below, how would I
> make the "ssl.truststore.location" and "ssl.keystore.location" accessible
> inside the Java SDK harness which runs the code.
>
> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": "bootstrap-server:17032",
> "security.protocol": "SSL",
> "ssl.truststore.location": "/opt/keys/client.truststore.jks", # 
> how do I make this available to the Java SDK harness
> "ssl.truststore.password": "password",
> "ssl.keystore.type": "PKCS12",
> "ssl.keystore.location": "/opt/keys/client.keystore.p12", # how 
> do I make this available to the Java SDK harness
> "ssl.keystore.password": "password",
> "group.id": "group",
> "basic.auth.credentials.source": "USER_INFO",
> "schema.registry.basic.auth.user.info": "user:password"
> },
> topics=["topic"],
> max_num_records=2,
> # expansion_service="localhost:56938"
> )
>
> I tried building a custom image that has pulls credentials into the
> container based on "apache/beam_java11_sdk:2.27.0" and using the 
> "--sdk_harness_container_image_overrides=.*java.*,{image}:{tag}"
> argument, but it does not appear to be used when the pipeline runs on
> Flink.
>
> Thank you for the great tool and any help or advice is greatly appreciated.
>
> --
> paul
>


Re: Regarding the field ordering after Select.Flattened transform

2021-01-20 Thread Brian Hulette
Gotcha. It would certainly look cleaner, I'd be open to such a change as
long as it doesn't complicate the code or impact performance, but I don't
think we should make any guarantees.

+Reuven Lax  WDYT?

Brian

On Wed, Jan 20, 2021 at 10:14 AM Tao Li  wrote:

> Hi Brian,
>
>
>
> Thanks for your quick response. I totally agree that the we should not
> rely on any assumption on the field order and we can always specify the
> order of the flattened fields as we want. There is no blocker issue for me
> with the current behavior, but I am just wondering if may be convenient in
> some use cases if we can just keep the order (roughly) consistent with the
> order of the parent fields from the original schema.
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 20, 2021 at 9:42 AM
> *To: *user 
> *Subject: *Re: Regarding the field ordering after Select.Flattened
> transform
>
>
>
> This does seem like an odd choice, I suspect this was just a matter of
> convenience of implementation since the javadoc makes no claims about field
> order.
>
> In general schema transforms don't take care to maintain a particular
> field order and I'd recommend against relying on it. Instead fields should
> be addressed by name, either with Row.getValue(Sring), or by mapping to a
> user type. Is there a reason you want to rely on a particular field order?
> Maybe when writing to certain IOs field order could be important.
>
>
>
> On Tue, Jan 19, 2021 at 1:36 PM Tao Li  wrote:
>
> Hi community,
>
>
>
> I have been experimenting with Select.Flattened transform and noticed that
> the field order in the flattened schema is not consistent with the order of
> the top level fields from the original schema. For example, in the original
> schema, we have field “foo” as the first field and it has a nested field
> “bar”. After applying the flattening transform, the new field “foo.bar”
> becomes the last field in the flattened schema. Seems like the order of the
> new fields are not that deterministic in the flattened schema. Is this an
> expected behavior? Don’t we guarantee any ordering of the flattened fields
> (e.g. being consistent with the original order)? Thanks!
>
>


Re: Regarding the field ordering after Select.Flattened transform

2021-01-20 Thread Brian Hulette
This does seem like an odd choice, I suspect this was just a matter of
convenience of implementation since the javadoc makes no claims about field
order.

In general schema transforms don't take care to maintain a particular field
order and I'd recommend against relying on it. Instead fields should be
addressed by name, either with Row.getValue(Sring), or by mapping to a user
type. Is there a reason you want to rely on a particular field order? Maybe
when writing to certain IOs field order could be important.

On Tue, Jan 19, 2021 at 1:36 PM Tao Li  wrote:

> Hi community,
>
>
>
> I have been experimenting with Select.Flattened transform and noticed that
> the field order in the flattened schema is not consistent with the order of
> the top level fields from the original schema. For example, in the original
> schema, we have field “foo” as the first field and it has a nested field
> “bar”. After applying the flattening transform, the new field “foo.bar”
> becomes the last field in the flattened schema. Seems like the order of the
> new fields are not that deterministic in the flattened schema. Is this an
> expected behavior? Don’t we guarantee any ordering of the flattened fields
> (e.g. being consistent with the original order)? Thanks!
>


Re: Beam with Confluent Schema Registry and protobuf

2021-01-11 Thread Brian Hulette
Thanks Cristian!

+user  in case there are interested parties that
monitor user@ and not dev@

On Fri, Jan 8, 2021 at 8:17 PM Cristian Constantinescu 
wrote:

> Hi everyone,
>
> Beam currently has a dependency on older versions of the Confluent libs.
> It makes it difficult to use Protobufs with the Confluent Schema Registry
> as ConfluentSchemaRegistryDeserializerProvider only supports Avro.
>
> I put up together a very simple project to demo how it can be done without
> touching any files inside of Beam. You can find it here:
> https://github.com/zeidoo/beam-confluent-schema-registry-protobuf
>
> Any comments are welcomed, especially if there are better ways of doing
> things.
>
> Cheers!
>


Re: Quick question regarding ParquetIO

2021-01-07 Thread Brian Hulette
On Wed, Jan 6, 2021 at 11:07 AM Tao Li  wrote:

> Hi Brian,
>
>
>
> Please see my answers inline.
>
>
>
> *From: *Brian Hulette 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 6, 2021 at 10:43 AM
> *To: *user 
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> Hey Tao,
>
> It does look like BEAM-11460 could work for you. Note that relies on a
> dynamic object which won't work with schema-aware transforms and
> SqlTransform. It's likely this isn't a problem for you, I just wanted to
> point it out.
>
> [tao] I just need a PCollection from IO. Then I can apply
> below code to enable the schemas transforms (I have verified this code
> works).
>
>
>
> setSchema(
>
>   AvroUtils.toBeamSchema(schema),
>
>   new TypeDescriptor[GenericRecord]() {},
>
>
> AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(schema)),
>
>   AvroUtils.getRowToGenericRecordFunction(schema))
>

This requires specifying the Avro schema doesn't it?


>
>
>
>
>
> Out of curiosity, for your use-case would it be acceptable if Beam peaked
> at the files at pipeline construction time to determine the schema for you?
> This is what we're doing for the new IOs in the Python SDK's DataFrame API.
> They're based on the pandas read_* methods, and use those methods at
> construction time to determine the schema.
>
>
>
> [taol] If I understand correctly, the behavior of the new dataframe API’s
> you are mentioning is very similar to spark parquet reader’s behaviors. If
> that’s the case, then it’s probably what I am looking for 😊
>
>
>
>
>
>
>
> Brian
>
>
>
> On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko 
> wrote:
>
> Hi Tao,
>
>
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C63744037837436%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=boTq%2FeTLXfx%2FBxntkU1%2Fateg0OC5K5N20DGF9cIUclQ%3D&reserved=0>
>
>
>
> Regards,
>
> Alexey
>
>
>
> On 6 Jan 2021, at 18:57, Tao Li  wrote:
>
>
>
> Hi beam community,
>
>
>
> Quick question about ParquetIO
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C63744037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9GM3OcxTsQWcuqjm%2BnlXwRgV4pjFOqIMXmVNp6wGW4o%3D&reserved=0>.
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7C757988eb460d4478450208d8b272f105%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C63744037847391%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ds2Eko1VgUlDVnDQndoHizNeDZTRrkTa276pENCk17Y%3D&reserved=0>
>  does
> not require such a schema specification.
>
>
>
> Please advise. Thanks a lot!
>
>
>
>


Re: Quick question regarding ParquetIO

2021-01-06 Thread Brian Hulette
Hey Tao,

It does look like BEAM-11460 could work for you. Note that relies on a
dynamic object which won't work with schema-aware transforms and
SqlTransform. It's likely this isn't a problem for you, I just wanted to
point it out.

Out of curiosity, for your use-case would it be acceptable if Beam peaked
at the files at pipeline construction time to determine the schema for you?
This is what we're doing for the new IOs in the Python SDK's DataFrame API.
They're based on the pandas read_* methods, and use those methods at
construction time to determine the schema.

Brian

On Wed, Jan 6, 2021 at 10:13 AM Alexey Romanenko 
wrote:

> Hi Tao,
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
>
> Regards,
> Alexey
>
> On 6 Jan 2021, at 18:57, Tao Li  wrote:
>
> Hi beam community,
>
> Quick question about ParquetIO
> .
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
>  does
> not require such a schema specification.
>
> Please advise. Thanks a lot!
>
>
>


Re: [VOTE] Release 2.27.0, release candidate #1

2020-12-24 Thread Brian Hulette
+Boyuan Zhang  helped me get to the bottom of the
sql_taxi issue. The problem is with the WriteStringsToPubSub API, which is
deprecated since 2.7.0, but used in the example. Boyuan has [1] out to fix
WriteStringsToPubSub and I just sent [2] to replace WriteStringsToPubSub
with WriteToPubSub in example code. Issue is tracked in [3].

[1] https://github.com/apache/beam/pull/13614
[2] https://github.com/apache/beam/pull/13615
[3] https://issues.apache.org/jira/browse/BEAM-11524

On Thu, Dec 24, 2020 at 8:26 AM Pablo Estrada  wrote:

> Alright! Thanks everyone for your validations. I'm cancelling this RC, and
> I'll perform cherry picks to prepare the next one.
>
> Please update this thread with any other cherry pick requests!
> -P.
>
> On Thu, Dec 24, 2020, 3:17 AM Ismaël Mejía  wrote:
>
>> It might be a good idea to include also:
>>
>> [BEAM-11403] Cache UnboundedReader per UnboundedSourceRestriction in
>> SDF Wrapper DoFn
>> https://github.com/apache/beam/pull/13592
>>
>> So Java development experience is less affected (as with 2.26.0) (There
>> is a flag to exclude but defaults matter).
>>
>> On Thu, Dec 24, 2020 at 2:56 AM Valentyn Tymofieiev 
>> wrote:
>> >
>> > We discovered a regression on CombineFn.from_callable() started in
>> 2.26.0. Even though it's not a regression in 2.27.0, I strongly prefer we
>> fix it in 2.27.0 as it leads to buggy behavior, so I vote -1.
>> >
>> > The fix to release branch is in flight:
>> https://github.com/apache/beam/pull/13613.
>> >
>> >
>> >
>> > On Wed, Dec 23, 2020 at 3:38 PM Brian Hulette 
>> wrote:
>> >>
>> >> -1 (non-binding)
>> >> Good news: I validated a dataframe pipeline on Dataflow which looked
>> good (with expected performance improvements!)
>> >> Bad news: I also tried to run the sql_taxi example pipeline (streaming
>> SQL in python) on Dataflow and ran into PubSub IO related issues. The
>> example fails in the same way with 2.26.0, but it works in 2.25.0. It's
>> possible this is a Dataflow bug and not a Beam one, but I'd like to
>> investigate further to make sure.
>> >>
>> >> On Wed, Dec 23, 2020 at 12:25 PM Kyle Weaver 
>> wrote:
>> >>>
>> >>> +1 (non-binding) Validated wordcount with Python source + Flink and
>> Spark job server jars. Also checked that the ...:sql:udf jar was added and
>> includes our cherry-picks. Thanks Pablo :)
>> >>>
>> >>> On Wed, Dec 23, 2020 at 12:02 PM Ahmet Altay 
>> wrote:
>> >>>>
>> >>>> +1 (binding).
>> >>>>
>> >>>> I validated python quickstarts. Thank you Pablo.
>> >>>>
>> >>>> On Tue, Dec 22, 2020 at 10:04 PM Jean-Baptiste Onofre <
>> j...@nanthrax.net> wrote:
>> >>>>>
>> >>>>> +1 (binding)
>> >>>>>
>> >>>>> Regards
>> >>>>> JB
>> >>>>>
>> >>>>> Le 23 déc. 2020 à 06:46, Pablo Estrada  a
>> écrit :
>> >>>>>
>> >>>>> Hi everyone,
>> >>>>> Please review and vote on the release candidate #1 for the version
>> 2.27.0, as follows:
>> >>>>> [ ] +1, Approve the release
>> >>>>> [ ] -1, Do not approve the release (please provide specific
>> comments)
>> >>>>>
>> >>>>>
>> >>>>> Reviewers are encouraged to test their own use cases with the
>> release candidate, and vote +1
>> >>>>>  if no issues are found.
>> >>>>>
>> >>>>> The complete staging area is available for your review, which
>> includes:
>> >>>>> * JIRA release notes [1],
>> >>>>> * the official Apache source release to be deployed to
>> dist.apache.org [2], which is signed with the key with fingerprint
>> C79DDD47DAF3808F0B9DDFAC02B2D9F742008494 [3],
>> >>>>> * all artifacts to be deployed to the Maven Central Repository [4],
>> >>>>> * source code tag "v2.27.0-RC1" [5],
>> >>>>> * website pull request listing the release [6], publishing the API
>> reference manual [7], and the blog post [8].
>> >>>>> * Python artifacts are deployed along with the source release to
>> the dist.apache.org [2].
>> >>>>> * Validation sheet with a tab for 2.27.0 release to help with
>> validation [9].
>> >>>>> * Docker images published to Docker Hub [10].
>> >>>>>
>> >>>>> The vote will be open for at least 72 hours, but given the
>> holidays, we will likely extend for a few more days. The release will be
>> adopted by majority approval, with at least 3 PMC affirmative votes.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> -P.
>> >>>>>
>> >>>>> [1]
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12349380
>> >>>>> [2] https://dist.apache.org/repos/dist/dev/beam/2.27.0/
>> >>>>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> >>>>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1145/
>> >>>>> [5] https://github.com/apache/beam/tree/v2.27.0-RC1
>> >>>>> [6] https://github.com/apache/beam/pull/13602
>> >>>>> [7] https://github.com/apache/beam-site/pull/610
>> >>>>> [8] https://github.com/apache/beam/pull/13603
>> >>>>> [9]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=194829106
>> >>>>> [10] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>> >>>>>
>> >>>>>
>>
>


Re: [VOTE] Release 2.27.0, release candidate #1

2020-12-23 Thread Brian Hulette
-1 (non-binding)
Good news: I validated a dataframe pipeline on Dataflow which looked good
(with expected performance improvements!)
Bad news: I also tried to run the sql_taxi example pipeline (streaming SQL
in python) on Dataflow and ran into PubSub IO related issues. The example
fails in the same way with 2.26.0, but it works in 2.25.0. It's possible
this is a Dataflow bug and not a Beam one, but I'd like to investigate
further to make sure.

On Wed, Dec 23, 2020 at 12:25 PM Kyle Weaver  wrote:

> +1 (non-binding) Validated wordcount with Python source + Flink and Spark
> job server jars. Also checked that the ...:sql:udf jar was added and
> includes our cherry-picks. Thanks Pablo :)
>
> On Wed, Dec 23, 2020 at 12:02 PM Ahmet Altay  wrote:
>
>> +1 (binding).
>>
>> I validated python quickstarts. Thank you Pablo.
>>
>> On Tue, Dec 22, 2020 at 10:04 PM Jean-Baptiste Onofre 
>> wrote:
>>
>>> +1 (binding)
>>>
>>> Regards
>>> JB
>>>
>>> Le 23 déc. 2020 à 06:46, Pablo Estrada  a écrit :
>>>
>>> Hi everyone,
>>> Please review and vote on the release candidate #1 for the version 2.27.
>>> 0, as follows:
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>
>>>
>>> Reviewers are encouraged to test their own use cases with the release
>>> candidate, and vote +1
>>>  if no issues are found.
>>>
>>> The complete staging area is available for your review, which includes:
>>> * JIRA release notes [1],
>>> * the official Apache source release to be deployed to dist.apache.org [2],
>>> which is signed with the key with fingerprint
>>> C79DDD47DAF3808F0B9DDFAC02B2D9F742008494 [3],
>>> * all artifacts to be deployed to the Maven Central Repository [4],
>>> * source code tag "v2.27.0-RC1" [5],
>>> * website pull request listing the release [6], publishing the API
>>> reference manual [7], and the blog post [8].
>>> * Python artifacts are deployed along with the source release to the
>>> dist.apache.org [2].
>>> * Validation sheet with a tab for 2.27.0 release to help with
>>> validation [9].
>>> * Docker images published to Docker Hub [10].
>>>
>>> The vote will be open for at least 72 hours, but given the holidays, we
>>> will likely extend for a few more days. The release will be adopted by
>>> majority approval, with at least 3 PMC affirmative votes.
>>>
>>> Thanks,
>>> -P.
>>>
>>> [1]
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12349380
>>>
>>> [2] https://dist.apache.org/repos/dist/dev/beam/2.27.0/
>>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>>> [4]
>>> https://repository.apache.org/content/repositories/orgapachebeam-1145/
>>> [5] https://github.com/apache/beam/tree/v2.27.0-RC1
>>> [6] https://github.com/apache/beam/pull/13602
>>> [7] https://github.com/apache/beam-site/pull/610
>>> [8] https://github.com/apache/beam/pull/13603
>>> [9]
>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=194829106
>>>
>>> [10] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>>>
>>>
>>>


Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Brian Hulette
Reuven, could you clarify what you have in mind? I know multiple times
we've discussed the possibility of adding update compatibility support to
SchemaCoder, including support for certain schema changes (field
additions/deletions) - I think the most recent discussion was here [1].

But it sounds like Talat is asking for something a little beyond that,
effectively a dynamic schema. Is that something you think we can support?

[1]
https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E

On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax  wrote:

> Thanks. It might be theoretically possible to do this (at least for the
> case where existing fields do not change). Whether anyone currently has
> available time to do this is a different question, but it's something that
> can be looked into.
>
> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer 
> wrote:
>
>> Adding new fields is more common than modifying existing fields. But type
>> change is also possible for existing fields, such as regular mandatory
>> field(string,integer) to union(nullable field). No field deletion.
>>
>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax  wrote:
>>
>>> And when you say schema changes, are these new fields being added to the
>>> schema? Or are you making changes to the existing fields?
>>>
>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
 Hi,
 For sure let me explain a little bit about my pipeline.
 My Pipeline is actually simple
 Read Kafka -> Convert Avro Bytes to Beam Row(DoFn,
 Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
 Row to Avro (DoFn)-> Write DB/GCS/GRPC etc

 On our jobs We have three type sqls
 - SELECT * FROM PCOLLECTION
 - SELECT * FROM PCOLLECTION 
 - SQL Projection with or without Where clause  SELECT col1, col2 FROM
 PCOLLECTION

 We know writerSchema for each message. While deserializing avro binary
 we use writer schema and reader schema on Convert Avro Bytes to Beam Row
 step. It always produces a reader schema's generic record and we convert
 that generic record to Row.
 While submitting DF job we use latest schema to generate beamSchema.

 In the current scenario When we have schema changes first we restart
 all 15k jobs with the latest updated schema then whenever we are done we
 turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
 we read different versions of the schema and we always produce the latest
 schema's record. Without breaking our pipeline we are able to handle
 multiple versions of data in the same streaming pipeline. If we can
 generate SQL's java code when we get notified wirth latest schema we will
 handle all schema changes. The only remaining obstacle is Beam's SQL Java
 code. That's why I am looking for some solution. We dont need multiple
 versions of SQL. We only need to regenerate SQL schema with the latest
 schema on the fly.

 I hope I can explain it :)

 Thanks

 [1]
 https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
 

 On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax  wrote:

> Can you explain the use case some more? Are you wanting to change your
> SQL statement as well when the schema changes? If not, what are those new
> fields doing in the pipeline? What I mean is that your old SQL statement
> clearly didn't reference those fields in a SELECT statement since they
> didn't exist, so what are you missing by not having them unless you are
> also changing the SQL statement?
>
> Is this a case where you have a SELECT *, and just want to make sure
> those fields are included?
>
> Reuven
>
> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
> tuya...@paloaltonetworks.com> wrote:
>
>> Hi Andrew,
>>
>> I assume SQL query is not going to change. Changing things is the Row
>> schema by adding new columns or rename columns. if we keep a version
>> information on somewhere for example a KV pair. Key is schema 
>> information,
>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>> pipelines. When we have a schema change we restart a 15k DF job which is
>> pain. I am looking for a possible way to avoid job restart. Dont you 
>> think
>> it is not still doable ?
>>
>> Thanks
>>
>>
>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud 
>> wrote:
>>
>>> Unfortunately we don't

Re: snowflake io in python

2020-11-20 Thread Brian Hulette
 
> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
> self._pipeline_context)117 return self._id_to_obj[id]
> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
> from_runner_api(proto, context)   1216 
> is_python_side_input(side_input_tags[0]) if side_input_tags else False)   
> 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, 
> context)   1219 if uses_python_sideinput_tags:   1220   # Ordering is 
> important here.
> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>  in from_runner_api(cls, proto, context)688 if proto is None or 
> proto.spec is None or not proto.spec.urn:689   return None--> 690 
> parameter_type, constructor = cls._known_urns[proto.spec.urn]691 692  
>try:
> KeyError: 'beam:transform:write_files:v1'
>
>
>
> On Fri, Nov 20, 2020 at 11:18 AM Brian Hulette 
> wrote:
>
>> Hm try passing in the args as they would appear in
>> `sys.argv`, PipelineOptions(['--experiments=use_runner_v2'])
>>
>> On Thu, Nov 19, 2020 at 12:14 PM Alan Krumholz 
>> wrote:
>>
>>> How can I pass that flag using the SDK?
>>> Tried this:
>>>
>>> pipeline = beam.Pipeline(options=PipelineOptions(experiments=
>>>> ['use_runner_v2'], ...)
>>>
>>>
>>> but still getting a similar error:
>>>
>>> ---KeyError
>>>   Traceback (most recent call 
>>> last) in > 1 bq_to_snowflake(
>>>   2 'ml-betterup.coach_search.distances',  3 
>>> 'analytics.ml.coach_search_distances',  4 'master')
>>>  in bq_to_snowflake(bq_table, snow_table, 
>>> git_branch)138 )139 --> 140 result = pipeline.run()141  
>>>result.wait_until_finish()142
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> run(self, test_runner_api)512   # When possible, invoke a round 
>>> trip through the runner API.513   if test_runner_api and 
>>> self._verify_runner_api_compatible():--> 514 return 
>>> Pipeline.from_runner_api(515 
>>> self.to_runner_api(use_fake_coders=True),516 self.runner,
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, runner, options, return_context, 
>>> allow_proto_holders)890 requirements=proto.requirements)891 
>>> root_transform_id, = proto.root_transform_ids--> 892 
>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>> 893 # TODO(robertwb): These are only needed to continue construction. 
>>> Omit?894 p.applied_labels = {
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114  
>>>if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>> self._pipeline_context)117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>>> transform_id in proto.subtransforms:-> 1279   part = 
>>> context.transforms.get_by_id(transform_id)   1280   part.parent = 
>>> result   1281   result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -> PortableObjectT114  
>>>if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
>>> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
>>> self._pipeline_context)117 return self._id_to_obj[id]
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
>>> from_runner_api(proto, context)   1277 result.parts = []   1278 for 
>>> transform_id in proto.subtransforms:-> 1279   part = 
>>> context.transforms.get_by_id(transform_id)   1280   part.parent = 
>>> result   1281   result.parts.append(part)
>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>  in get_by_id(self, id)113 # type: (str) -&

Re: snowflake io in python

2020-11-20 Thread Brian Hulette
 (str) -> PortableObjectT114
>  if id not in self._id_to_obj:--> 115   self._id_to_obj[id] = 
> self._obj_type.from_runner_api(116   self._id_to_proto[id], 
> self._pipeline_context)117 return self._id_to_obj[id]
> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in 
> from_runner_api(proto, context)   1216 
> is_python_side_input(side_input_tags[0]) if side_input_tags else False)   
> 1217 -> 1218 transform = ptransform.PTransform.from_runner_api(proto, 
> context)   1219 if uses_python_sideinput_tags:   1220   # Ordering is 
> important here.
> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>  in from_runner_api(cls, proto, context)688 if proto is None or 
> proto.spec is None or not proto.spec.urn:689   return None--> 690 
> parameter_type, constructor = cls._known_urns[proto.spec.urn]691 692  
>try:
> KeyError: 'beam:transform:write_files:v1'
>
>
>
> On Thu, Nov 19, 2020 at 2:18 PM Brian Hulette  wrote:
>
>> Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline
>> (add the flag '--experiments=use_runner_v2'). See also [2].
>>
>> [1]
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11
>> [2]
>> https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines
>>
>> On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz 
>> wrote:
>>
>>> DataFlow runner
>>>
>>> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette 
>>> wrote:
>>>
>>>> Hm what runner are you using? It looks like we're trying to encode and
>>>> decode the pipeline proto, which isn't possible for a multi-language
>>>> pipeline. Are you using a portable runner?
>>>>
>>>> Brian
>>>>
>>>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz <
>>>> alan.krumh...@betterup.co> wrote:
>>>>
>>>>> got it, thanks!
>>>>> I was using:
>>>>> 'xx.us-east-1'
>>>>> Seems using this instead fixes that problem:
>>>>> 'xx.us-east-1.snowflakecomputing.com
>>>>>
>>>>> I'm now hitting a different error though (now in python):
>>>>>
>>>>>  in bq_to_snowflake(bq_table,
>>>>>> snow_table, git_branch)
>>>>>> 161 )
>>>>>> 162
>>>>>> --> 163 result = pipeline.run()
>>>>>> 164 result.wait_until_finish()
>>>>>> 165
>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>> in run(self, test_runner_api)
>>>>>> 512 # When possible, invoke a round trip through the runner API.
>>>>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>>>>> use_fake_coders=True),
>>>>>> 516 self.runner,
>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>> in from_runner_api(proto, runner, options, return_context,
>>>>>> allow_proto_holders)
>>>>>> 890 requirements=proto.requirements)
>>>>>> 891 root_transform_id, = proto.root_transform_ids
>>>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>>>> root_transform_id)]
>>>>>> 893 # TODO(robertwb): These are only needed to continue
>>>>>> construction. Omit?
>>>>>> 894 p.applied_labels = {
>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>>>> in get_by_id(self, id)
>>>>>> 113 # type: (str) -> PortableObjectT
>>>>>> 114 if id not in self._id_to_obj:
>>>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>>> 117 return self._id_to_obj[id]
>>>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>>>> in from_runner_api(proto, context)
>>>>>> 1277 result.parts = []
>>>>>> 1278 for transform_id in proto.subtransforms:
>>>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>>>> 1280 part.parent = result
>>>>&g

Re: snowflake io in python

2020-11-19 Thread Brian Hulette
Ah ok, you'll need to use Dataflow runner v2 [1] to run this pipeline (add
the flag '--experiments=use_runner_v2'). See also [2].

[1]
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#python_11
[2]
https://cloud.google.com/blog/products/data-analytics/multi-language-sdks-for-building-cloud-pipelines

On Thu, Nov 19, 2020 at 11:10 AM Alan Krumholz 
wrote:

> DataFlow runner
>
> On Thu, Nov 19, 2020 at 2:00 PM Brian Hulette  wrote:
>
>> Hm what runner are you using? It looks like we're trying to encode and
>> decode the pipeline proto, which isn't possible for a multi-language
>> pipeline. Are you using a portable runner?
>>
>> Brian
>>
>> On Thu, Nov 19, 2020 at 10:42 AM Alan Krumholz 
>> wrote:
>>
>>> got it, thanks!
>>> I was using:
>>> 'xx.us-east-1'
>>> Seems using this instead fixes that problem:
>>> 'xx.us-east-1.snowflakecomputing.com
>>>
>>> I'm now hitting a different error though (now in python):
>>>
>>>  in bq_to_snowflake(bq_table,
>>>> snow_table, git_branch)
>>>> 161 )
>>>> 162
>>>> --> 163 result = pipeline.run()
>>>> 164 result.wait_until_finish()
>>>> 165 ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py
>>>> in run(self, test_runner_api)
>>>> 512 # When possible, invoke a round trip through the runner API.
>>>> 513 if test_runner_api and self._verify_runner_api_compatible():
>>>> --> 514 return Pipeline.from_runner_api( 515 self.to_runner_api(
>>>> use_fake_coders=True),
>>>> 516 self.runner,
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, runner, options, return_context,
>>>> allow_proto_holders)
>>>> 890 requirements=proto.requirements)
>>>> 891 root_transform_id, = proto.root_transform_ids
>>>> --> 892 p.transforms_stack = [context.transforms.get_by_id(
>>>> root_transform_id)]
>>>> 893 # TODO(robertwb): These are only needed to continue construction.
>>>> Omit?
>>>> 894 p.applied_labels = {
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py
>>>> in get_by_id(self, id)
>>>> 113 # type: (str) -> PortableObjectT
>>>> 114 if id not in self._id_to_obj:
>>>> --> 115 self._id_to_obj[id] = self._obj_type.from_runner_api( 116
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> 117 return self._id_to_obj[id]
>>>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in
>>>> from_runner_api(proto, context)
>>>> 1277 result.parts = []
>>>> 1278 for transform_id in proto.subtransforms:
>>>> -> 1279 part = context.transforms.get_by_id(transform_id)
>>>> 1280 part.parent = result
>>>> 1281 result.parts.append(part)
>>>> ~/

Re: snowflake io in python

2020-11-19 Thread Brian Hulette
ython3.8/site-packages/apache_beam/pipeline.py in
>> from_runner_api(proto, context)
>> 1216 is_python_side_input(side_input_tags[0]) if side_input_tags else
>> False)
>> 1217
>> -> 1218 transform = ptransform.PTransform.from_runner_api(proto, context)
>> 1219 if uses_python_sideinput_tags:
>> 1220 # Ordering is important here.
>> ~/opt/anaconda3/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py
>> in from_runner_api(cls, proto, context)
>> 688 if proto is None or proto.spec is None or not proto.spec.urn:
>> 689 return None
>> --> 690 parameter_type, constructor = cls._known_urns[proto.spec.urn]
>> 691
>> 692 try: KeyError: 'beam:transform:write_files:v1'
>
>
>
> I'll keep trying to make this work but sharing it in case you can easily
> see what the problem is
>
> Thanks so much!
>
> On Thu, Nov 19, 2020 at 1:30 PM Brian Hulette  wrote:
>
>> Hi Alan,
>> Sorry this error message is so verbose. What are you passing for the
>> server_name argument [1]? It looks like that's what the Java stacktrace is
>> complaining about:
>>
>> java.lang.IllegalArgumentException: serverName must be in format
>> .snowflakecomputing.com
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302
>>
>> On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz 
>> wrote:
>>
>>> I'm trying to replace my custom/problematic snowflake sink with the new:
>>> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake
>>>
>>> However when I try to run my pipeline  (using python) I get this Java
>>> error:
>>>
>>> RuntimeError: java.lang.RuntimeException: Failed to build transform
>>>> beam:external:java:snowflake:write:v1 from spec urn:
>>>> "beam:external:java:snowflake:write:v1"
>>>
>>>
>>> It is hard to understand why it is failing from reading the partial java 
>>> error trace I get in the output:
>>>
>>>> at 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>>>>at 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>>>>at 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>>>>at 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>>>>at 
>>>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>>>>at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>>>at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>>>at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>>>at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>at 
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>>>>at 
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>>>>at java.base/java.lang.Thread.run(Thread.java:832)
>>>> Caused by: java.lang.IllegalArgumentException: serverName must be in 
>>>> format .snowflakecomputing.com
>>>>at 
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>>>>at 
>>>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>>>>at 
>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>>>>at 
>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>>>>at 
>>>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>>>>at 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>>>>... 12 more
>>>
>>>
>>>
>>>  any clue how I can debug/fix this using python?
>>>
>>>
>>>
>>>


Re: snowflake io in python

2020-11-19 Thread Brian Hulette
Hi Alan,
Sorry this error message is so verbose. What are you passing for the
server_name argument [1]? It looks like that's what the Java stacktrace is
complaining about:

java.lang.IllegalArgumentException: serverName must be in format
.snowflakecomputing.com

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/snowflake.py#L302

On Thu, Nov 19, 2020 at 10:16 AM Alan Krumholz 
wrote:

> I'm trying to replace my custom/problematic snowflake sink with the new:
> https://beam.apache.org/documentation/io/built-in/snowflake/#writing-to-snowflake
>
> However when I try to run my pipeline  (using python) I get this Java
> error:
>
> RuntimeError: java.lang.RuntimeException: Failed to build transform
>> beam:external:java:snowflake:write:v1 from spec urn:
>> "beam:external:java:snowflake:write:v1"
>
>
> It is hard to understand why it is failing from reading the partial java 
> error trace I get in the output:
>
>> at 
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:130)
>>  at 
>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:357)
>>  at 
>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:433)
>>  at 
>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:488)
>>  at 
>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:232)
>>  at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
>>  at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
>>  at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
>>  at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>  at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>>  at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>>  at java.base/java.lang.Thread.run(Thread.java:832)
>> Caused by: java.lang.IllegalArgumentException: serverName must be in format 
>> .snowflakecomputing.com
>>  at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
>>  at 
>> org.apache.beam.sdk.io.snowflake.SnowflakeIO$DataSourceConfiguration.withServerName(SnowflakeIO.java:1700)
>>  at 
>> org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration.getDataSourceConfiguration(CrossLanguageConfiguration.java:166)
>>  at 
>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:78)
>>  at 
>> org.apache.beam.sdk.io.snowflake.crosslanguage.WriteBuilder.buildExternal(WriteBuilder.java:34)
>>  at 
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:125)
>>  ... 12 more
>
>
>
>  any clue how I can debug/fix this using python?
>
>
>
>


Re: beam rebuilds numpy on pipeline run

2020-10-09 Thread Brian Hulette
+Valentyn Tymofieiev 

This sounds like it's related to ARROW-8983 (pyarrow takes a long time to
download after 0.16.0), discussed on the arrow dev list [2]. I'm not sure
what would've triggered this to start happening for you today though.

[1] https://issues.apache.org/jira/browse/ARROW-8983
[2]
https://lists.apache.org/thread.html/r9baa48a9d1517834c285f0f238f29fcf54405cb7cf1e681314239d7f%40%3Cdev.arrow.apache.org%3E

On Fri, Oct 9, 2020 at 12:10 PM Ross Vandegrift <
ross.vandegr...@cleardata.com> wrote:

> Hello,
>
> Starting today, running a beam pipeline triggers a large reinstallation of
> python modules.  For some reason, it forces full rebuilds from source -
> since
> beam depends on numpy, this takes a long time.
>
> There's nothing strange about my python setup.  I'm using python3.7 on
> debian
> buster with the dataflow runner.  My venv is setup like this:
>  python3 -m venv ~/.venvs/beam
>  . ~/.venvs/beam/bin/activate
>  python3 -m pip install --upgrade wheel
>  python3 -m pip install --upgrade pip setuptools
>  python3 -m pip install -r requirements.txt
>
> My requirements.txt has:
>   apache-beam[gcp]==2.23.0
>   boto3==1.15.0
>
> When it's building, `ps ax | grep python` shows me this:
>   /home/ross/.venvs/beam/bin/python -m pip download --dest /tmp/dataflow-
> requirements-cache -r requirements.txt --exists-action i --no-binary :all:
>
> How do I prevent this?  It's far too slow to develop with, and our
> compliance
> folks are likely to prohibit a tool that silently downloads & builds
> unknown
> code.
>
> Ross
>


Re: Ability to link to "latest" of python docs

2020-09-10 Thread Brian Hulette
There's https://beam.apache.org/releases/pydoc/current (and
https://beam.apache.org/releases/javadoc/current) which redirect to the
most recent release. These get updated somewhere in the release process.

They're not very discoverable since it's a redirect and the URL changes
when you click on it, but maybe that can still work for intersphinx mapping?

Brian

On Tue, Sep 8, 2020 at 7:54 PM Austin Bennett 
wrote:

> +dev 
>
> Lynn,
>
> Seems totally doable.  If others don't speak up with a good way to do this
> (or in opposition), I'm sure we can sort something out to accomplish this
> (will dig into intersphinx mapping tomorrow).
>
> Cheers,
> Austin
>
>
>
>
> On Tue, Sep 8, 2020, 5:19 PM Lynn Root  wrote:
>
>> Hey folks -
>>
>> I'm wondering if there's a way to link to the latest SDK version of the
>> Python documentation. I see that if I go here
>> , it lists all the available
>> documented SDK versions. But it'd be really nice to go to a link like "
>> https://beam.apache.org/releases/pydoc/latest"; and be automatically
>> pointed to the latest one. This is particularly handy for documenting
>> libraries that use beam via intersphinx mapping
>> .
>>
>> Thanks!
>>
>> --
>> Lynn Root
>> Staff Engineer, Spotify
>>
>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-03 Thread Brian Hulette
You may be able to get some additional insight if you configure Dataflow to
save a heap dump before killing the JVM
(--dumpHeapOnOOM, --saveHeapDumpsToGcsPath) and inspecting the dump. There
are directions for that (and a lot of other advice about memory issues) at
[1].


Another question - is this a batch pipeline? There's an open Jira about
thrashing detection in Dataflow's batch worker [2]. The fact that we shut
down a worker's JVM after sustained periods of thrashing is part of a
larger system for dealing with memory pressure intended for use in
streaming pipelines. We may want to make it opt-in for batch pipelines. I
wrote a PR that made the JVM shutdown opt-in for all Dataflow pipelines
earlier this year, but I closed it when I realized it's an important
feature in streaming [3]. I could revisist that PR and make the feature
opt-in for batch, opt-out for streaming, but that wouldn't help you until
the next Beam release.

Brian

[1]
https://cloud.google.com/community/tutorials/dataflow-debug-oom-conditions
[2] https://issues.apache.org/jira/browse/BEAM-9049
[3] https://github.com/apache/beam/pull/10499#issuecomment-570743842


On Thu, Sep 3, 2020 at 10:48 AM Talat Uyarer 
wrote:

> Hi,
>
> One more update. Sorry When I created a code sample that I shared. I put
> StringBuilder under the setup function but actually it was on the start
> bundle function. So far I tested below scenarios
> - with StringWriter construct object every processElement call
> - with StringBuilder construct object every processElement call
> - with StringBuilder construct object every startBundle call (and also
> tried setLength(0) and delete(0,sb.length() to clean StringBuilder)
>
> None of the cases prevent DF jobs from getting below error.
>
>> Shutting down JVM after 8 consecutive periods of measured GC thrashing.
>> Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
>> #pushbacks=3, gc thrashing=true. Heap dump not written.
>
>
> And also my process rate is 4kps per instance. I would like to hear your
> suggestions if you have any.
>
> Thanks
>
> On Wed, Sep 2, 2020 at 6:22 PM Talat Uyarer 
> wrote:
>
>> I also tried Brian's suggestion to clear stringbuilder by calling delete
>> with stringbuffer length. No luck. I am still getting the same error
>> message. Do you have any suggestions ?
>>
>> Thanks
>>
>> On Wed, Sep 2, 2020 at 3:33 PM Talat Uyarer 
>> wrote:
>>
>>> If I'm understanding Talat's logic correctly, it's not necessary to
>>>> reuse the string builder at all in this case.
>>>
>>> Yes. I tried it too. But DF job has the same issue.
>>>
>>>
>>> On Wed, Sep 2, 2020 at 3:17 PM Kyle Weaver  wrote:
>>>
>>>> > It looks like `writer.setLength(0)` may actually allocate a new
>>>> buffer, and then the buffer may also need to be resized as the String
>>>> grows, so you could be creating a lot of orphaned buffers very quickly. I'm
>>>> not that familiar with StringBuilder, is there a way to reset it and re-use
>>>> the existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>>
>>>> If I'm understanding Talat's logic correctly, it's not necessary to
>>>> reuse the string builder at all in this case.
>>>>
>>>> On Wed, Sep 2, 2020 at 3:11 PM Brian Hulette 
>>>> wrote:
>>>>
>>>>> That error isn't exactly an OOM, it indicates the JVM is spending a
>>>>> significant amount of time in garbage collection.
>>>>>
>>>>> It looks like `writer.setLength(0)` may actually allocate a new
>>>>> buffer, and then the buffer may also need to be resized as the String
>>>>> grows, so you could be creating a lot of orphaned buffers very quickly. 
>>>>> I'm
>>>>> not that familiar with StringBuilder, is there a way to reset it and 
>>>>> re-use
>>>>> the existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>>>
>>>>> [1]
>>>>> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=8xskmxTZ2EbxwBknWfeIiV2kEsXsu9dzjWT_yG6A0s4&s=ZL6S353ZUzPRmxrPo8Sei_mdxsWDxs4Km2RwwiwefEU&e=>
>>>>>
>>>

Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Brian Hulette
That error isn't exactly an OOM, it indicates the JVM is spending a
significant amount of time in garbage collection.

It looks like `writer.setLength(0)` may actually allocate a new buffer, and
then the buffer may also need to be resized as the String grows, so you
could be creating a lot of orphaned buffers very quickly. I'm not that
familiar with StringBuilder, is there a way to reset it and re-use the
existing capacity? Maybe `writer.delete(0, writer.length())` [1]?

[1]
https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop

On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer 
wrote:

> Sorry for the wrong import. You can see on the code I am using
> StringBuilder.
>
> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>
>> Here is a question answered on StackOverflow:
>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>> 
>>
>> Could you try using StringBuilder instead since the usage is not
>> appropriate for a StringWriter?
>>
>>
>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer 
>> wrote:
>>
>>> Hi,
>>>
>>> I have an issue with String Concatenating. You can see my code below.[1]
>>> I have a step on my df job which is concatenating strings. But somehow when
>>> I use that step my job starts getting jvm restart errors.
>>>
>>>  Shutting down JVM after 8 consecutive periods of measured GC thrashing.
 Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
 #pushbacks=3, gc thrashing=true. Heap dump not written.
>>>
>>>
>>> And also I try to use Avro rather than String. When I use Avro, it works
>>> fine without any issue. Do you have any suggestions?
>>>
>>> Thanks
>>>
>>> [1] https://dpaste.com/7RTV86WQC
>>> 
>>>
>>>
>>>


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-08-19 Thread Brian Hulette
essElement(ArticlesKafkaToBigQuery.java:414)
>
> So it does not seem to have an effect when the annotation on
> EnrichedArticle is present. Without the annotation however, there is no
> schema defined on the output PCollection, so I have to define it myself for
> the BigQueryIO to work:
> [The code of the EnrichFn transforms an AssetEnvelope to a Java POJO Asset
> class and enriches it via an RPC call, the Asset has a low number of
> fields, so doing the manual mapping here is manageable, even though I would
> like to use
> Beam Schema as soon as this problem here is solved, which would make that
> Asset POJO obsolete.]
>
> PCollection> assets =
> p.apply("Create assets", Create.of(kvAsset));
>
> PCollection> articles =
> p.apply("Create articles", Create.of(kvArticle));
>
> TupleTag articleTag = new TupleTag<>();
> TupleTag assetTag = new TupleTag<>();
>
> PCollection> joinedCollection =
> KeyedPCollectionTuple
> .of(articleTag, articles).and(assetTag,
> assets).apply(CoGroupByKey.create());
>
> PCollection output = joinedCollection
> .apply(ParDo.of(new ArticlesKafkaToBigQuery.EnrichFn(articleTag,
> assetTag)));
> // The following line returns false:
> output.hasSchema()
>
> ...BigQueryIO...
>
> On Tue, Jun 30, 2020 at 5:48 AM Luke Cwik  wrote:
>
>> Can you give context as to whether schemas will ever allow recursive
>> types since this is pretty common in lots of languages?
>>
>> On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette 
>> wrote:
>>
>>> It just occurred to me that BEAM-10265 [1] could be the cause of the
>>> stack overflow. Does ArticleEnvelope refer to itself recursively? Beam
>>> schemas are not allowed to be recursive, and it looks like we don't fail
>>> gracefully for recursive proto definitions.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-10265
>>>
>>> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette 
>>> wrote:
>>>
>>>> Hm it looks like the error is from trying to call the zero-arg
>>>> constructor for the ArticleEnvelope proto class. Do you have a schema
>>>> registered for ArticleEnvelope?
>>>>
>>>> I think maybe what's happening is Beam finds there's no schema
>>>> registered for ArticleEnvelope, so it just recursively
>>>> applies JavaFieldSchema, which generates code that attempts to use the
>>>> zero-arg constructor. It looks like that's a bug in JavaFieldSchema, we
>>>> should fail earlier with a better message rather than just generating code
>>>> that will try to access a private constructor, I filed a jira for this [1].
>>>>
>>>> I think you can get this working if you register a Schema for
>>>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>>>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>>>   and +Alex Van Boxel   in case
>>>> they have better advice), you might try just registering a provider
>>>> manually when you create the pipeline, something like
>>>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>>>> new ProtoMessageSchema())`.
>>>>
>>>> Brian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>>>
>>>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias <
>>>> tobias.kay...@ricardo.ch> wrote:
>>>>
>>>>> A bit more context - I started with the Beam documentation and
>>>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>>>> dug deeper and tried to implement the methods myself.
>>>>>
>>>>> What I also tried is the following class definition:
>>>>>
>>>>> @DefaultSchema(JavaFieldSchema.class)
>>>>> public class EnrichedArticle implements Serializable {
>>>>>
>>>>>   // ArticleEnvelope is generated from Protobuf
>>>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>>>   // Asset is a Java POJO
>>>>>   @Nullable public List assets;
>>>>>
>>>>>   @SchemaCreate
>>>>>   public EnrichedArticle() {}
>>>>>
>>>>>   @SchemaCreate
>>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>>> List assets) {
>>>&g

Re: Registering Protobuf schema

2020-08-19 Thread Brian Hulette
Ah yes, the SchemaRegistry and SchemaProvider follow the same model, but
none of the SchemaProviders are registered by default. Users can register
the proto schema provider with
registerSchemaProvider(Class) [1]:

  p.getSchemaRegistry().registerSchemaProvider(ProtoMessageSchema.class);

Then SchemaCoder should be used for all proto classes.
We could use ServiceLoader to register all schema providers, then users
wouldn't need to do this. I assume the reason we don't already is because
schemas are still experimental and we want it to be opt-in.

[1]
https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider-
<https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider->

On Wed, Aug 19, 2020 at 8:44 AM Luke Cwik  wrote:

> Brian, Coders have a provider model where the provider can be queried to
> resolve for a given type and the providers are resolved in a specific
> order. This gave the flexibility to handle situations like the one you
> described.
>
> On Wed, Aug 19, 2020 at 12:30 AM 
> wrote:
>
>> Hi Brian,
>>
>>
>>
>> Many thanks for your mail.
>>
>>
>>
>> Yes I figured that one out in the end from the docs, but many thanks for
>> confirming.
>>
>>
>>
>> I did subsequently discover some other issues with protoBuf-derived
>> schemas (essentially they don’t seem to be properly supported by
>> BigQueryIO.Write or allow for optional fields) but I posted a separate
>> message on the dev channel covering this.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Rob
>>
>>
>>
>> *From:* Brian Hulette [mailto:bhule...@google.com]
>> *Sent:* 18 August 2020 20:50
>> *To:* user
>> *Subject:* Re: Registering Protobuf schema
>>
>>
>>
>>
>> *
>> "This is an external email. Do you know who has sent it? Can you be sure
>> that any links and attachments contained within it are safe? If in any
>> doubt, use the Phishing Reporter Button in your Outlook client or forward
>> the email as an attachment to ~ I've Been Phished"
>> *
>>
>> Hi Robert,
>> Sorry for the late reply on this. I think you should be able to do this
>> by registering it in your pipeline's SchemaRegistry manually, like so:
>>
>>
>>
>>   Pipeline p;
>>
>>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
>> ProtoMessageSchema.class);
>>
>> Of course this isn't quite as nice as just adding the DefualtSchema
>> annotation to a class you control. Maybe we should consider some global
>> config that would always use schemas for proto-generated classes.
>>
>>
>> Brian
>>
>>
>>
>> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
>> wrote:
>>
>> This sounds like it is related to the problem I'm trying to solve. (In my
>> case having a Java POJO containing a protobuf backed-class and trying to
>> generate a Beam Schema from it.)
>>
>> I would be very interested to a solution to this as well :)
>>
>>
>>
>> On Tue, Jul 7, 2020 at 2:22 PM  wrote:
>>
>> Hi All,
>>
>>
>>
>> I have a BEAM pipeline where I am reading data from some parquet files
>> and converting them into a different format based on protobuf generated
>> classes.
>>
>>
>>
>> I wish to associate a schema (derived from the protobuf classes) for my
>> PCollections.  What is the appropriate way to do this with
>> protobuf-generated classes?
>>
>>
>>
>> Code excerpt:
>>
>>
>>
>> PCollection result = input.apply("FXFilePattern", FileIO.*match*
>> ().filepattern(fxDataFilePattern))
>> .apply("FXReadMatches", FileIO.*readMatches*())
>> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
>> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
>> boolean hasSchema = result.hasSchema();  // returns false
>>
>>
>>
>> With thanks in advance.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Rob
>>
>>
>>
>> *Robert Butcher*
>>
>> *Technical Architect | Foundry/SRS | NatWest Markets*
>>
>> WeWork, 10 Devonshire Square, London, EC2M 4AE
>>
>> Mobile +44 (0) 7414 730866 

Re: Registering Protobuf schema

2020-08-18 Thread Brian Hulette
Hi Robert,
Sorry for the late reply on this. I think you should be able to do this by
registering it in your pipeline's SchemaRegistry manually, like so:

  Pipeline p;
  p.getSchemaRegistry().registerSchemaProvider(Fx.class,
ProtoMessageSchema.class);

Of course this isn't quite as nice as just adding the DefualtSchema
annotation to a class you control. Maybe we should consider some global
config that would always use schemas for proto-generated classes.

Brian

On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
wrote:

> This sounds like it is related to the problem I'm trying to solve. (In my
> case having a Java POJO containing a protobuf backed-class and trying to
> generate a Beam Schema from it.)
>
> I would be very interested to a solution to this as well :)
>
> On Tue, Jul 7, 2020 at 2:22 PM  wrote:
>
>> Hi All,
>>
>>
>>
>> I have a BEAM pipeline where I am reading data from some parquet files
>> and converting them into a different format based on protobuf generated
>> classes.
>>
>>
>>
>> I wish to associate a schema (derived from the protobuf classes) for my
>> PCollections.  What is the appropriate way to do this with
>> protobuf-generated classes?
>>
>>
>>
>> Code excerpt:
>>
>>
>>
>> PCollection result = input.apply("FXFilePattern", FileIO.*match*
>> ().filepattern(fxDataFilePattern))
>> .apply("FXReadMatches", FileIO.*readMatches*())
>> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
>> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
>> boolean hasSchema = result.hasSchema();  // returns false
>>
>>
>>
>> With thanks in advance.
>>
>>
>>
>> Kind regards,
>>
>>
>>
>> Rob
>>
>>
>>
>> *Robert Butcher*
>>
>> *Technical Architect | Foundry/SRS | NatWest Markets*
>>
>> WeWork, 10 Devonshire Square, London, EC2M 4AE
>>
>> Mobile +44 (0) 7414 730866 <+44%207414%20730866>
>>
>>
>>
>> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>>
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for distribution
>> to, or use by any person or entity in any jurisdiction or country where
>> such distribution or use would be contrary to local law or regulation.
>> NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts
>> no responsibility for any changes made to this message after it was sent.
>> Unless otherwise specifically indicated, the contents of this
>> communication and its attachments are for information purposes only and
>> should not be regarded as an offer or solicitation to buy or sell a product
>> or service, confirmation of any transaction, a valuation, indicative price
>> or an official statement. Trading desks may have a position or interest
>> that is inconsistent with any views expressed in this message. In
>> evaluating the information contained in this message, you should know that
>> it could have been previously provided to other clients and/or internal
>> NatWest Markets personnel, who could have already acted on it.
>> NatWest Markets cannot provide absolute assurances that all electronic
>> communications (sent or received) are secure, error free, not corrupted,
>> incomplete or virus free and/or that they will not be lost, mis-delivered,
>> destroyed, delayed or intercepted/decrypted by others. Therefore NatWest
>> Markets disclaims all liability with regards to electronic communications
>> (and the contents therein) if they are corrupted, lost destroyed, delayed,
>> incomplete, mis-delivered, intercepted, decrypted or otherwise
>> misappropriated by others.
>> Any electronic communication that is conducted within or through NatWest
>> Markets systems will be subject to being archived, monitored and produced
>> to regulators and in litigation in accordance with NatWest Markets’ policy
>> and local laws, rules and regulations. Unless expressly prohibited by local
>> law, electronic communications may be archived in countries other than the
>> country in which you are located, and may be treated in accordance with the
>> laws and regulations of the country of each individual included in the
>> entire chain.
>> Copyright NatWest Markets Plc. All rights reserved. See
>> https://www.nwm.com/disclaimer for further risk disclosure.
>>
>


Re: Accumulator with Map field in CombineFn not serializing correctly

2020-08-07 Thread Brian Hulette
Interesting, thanks for following up with the fix. Were you able to find a
way to reproduce this locally, or did it only occur on Dataflow?

Did you have to make a similar change for the HashMap in Accum, or just the
ExpiringLinkHashMap?

Brian

On Fri, Aug 7, 2020 at 9:58 AM Josh  wrote:

> I have resolved this issue now, in case anyone else runs into this problem
> in future, the resolution was simply to use the concrete type for the field
> in the accumulator, rather than Map:
>
> ExpiringLinkedHashMap recentEvents = new 
> ExpiringLinkedHashMap<>()
>
>
> On Thu, Aug 6, 2020 at 3:16 PM Josh  wrote:
>
>> Hi all,
>>
>> In my Beam job I have defined my own CombineFn with an accumulator.
>> Running locally is no problem, but when I run the job on Dataflow I hit an
>> Avro serialization exception:
>> java.lang.NoSuchMethodException: java.util.Map.()
>> java.lang.Class.getConstructor0(Class.java:3082)
>> java.lang.Class.getDeclaredConstructor(Class.java:2178)
>> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>>
>> I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
>> accumulator class. Is there anything special I need to do because one of
>> the fields in my accumulator class is a Map? I have pasted an outline of my
>> CombineFn below.
>>
>> Thanks for any help with this!
>>
>> Josh
>>
>> private static class MyCombineFn extends CombineFn> MyCombineFn.Accum, Out> {
>>
>> private static class ExpiringLinkedHashMap extends
>> LinkedHashMap {
>> @Override
>> protected boolean removeEldestEntry(Map.Entry eldest) {
>> return this.size() > 10;
>> }
>> }
>>
>> @DefaultCoder(AvroCoder.class)
>> private static class PartialEventUpdate implements Serializable {
>> Long incrementCountBy = 0L;
>> Map recentEvents = new
>> ExpiringLinkedHashMap<>();
>> Long lastSeenMillis = 0L;
>>
>> PartialEventUpdate() {}
>> }
>>
>> @DefaultCoder(AvroCoder.class)
>> private static class Accum implements Serializable {
>> Map eventIdToUpdate = new
>> HashMap<>();
>>
>> Accum() {}
>> }
>>
>> @Override
>> public MyCombineFn.Accum createAccumulator() {
>> return new MyCombineFn.Accum();
>> }
>>
>> ...
>>
>> }
>>
>


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-29 Thread Brian Hulette
It just occurred to me that BEAM-10265 [1] could be the cause of the stack
overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas
are not allowed to be recursive, and it looks like we don't fail gracefully
for recursive proto definitions.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-10265

On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette  wrote:

> Hm it looks like the error is from trying to call the zero-arg constructor
> for the ArticleEnvelope proto class. Do you have a schema registered for
> ArticleEnvelope?
>
> I think maybe what's happening is Beam finds there's no schema registered
> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
> generates code that attempts to use the zero-arg constructor. It looks like
> that's a bug in JavaFieldSchema, we should fail earlier with a better
> message rather than just generating code that will try to access a private
> constructor, I filed a jira for this [1].
>
> I think you can get this working if you register a Schema for
> ArticleEnvelope. I'm not actually sure of the best way to do this since
> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>   and +Alex Van Boxel   in case they
> have better advice), you might try just registering a provider manually
> when you create the pipeline, something like
> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
> new ProtoMessageSchema())`.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10372
>
> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias 
> wrote:
>
>> A bit more context - I started with the Beam documentation and
>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>> dug deeper and tried to implement the methods myself.
>>
>> What I also tried is the following class definition:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>> public class EnrichedArticle implements Serializable {
>>
>>   // ArticleEnvelope is generated from Protobuf
>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>   // Asset is a Java POJO
>>   @Nullable public List assets;
>>
>>   @SchemaCreate
>>   public EnrichedArticle() {}
>>
>>   @SchemaCreate
>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>> List assets) {
>> this.article = article;
>> this.assets = assets;
>>   }
>> }
>>
>> This throws the following exception:
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> ...
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apach

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-29 Thread Brian Hulette
gt;> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
>> and my constructor with a @SchemaCreate ,I get the following exception:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$b2RNJqmi.create(Unknown
>> Source)
>>
>> 2. When I annotate the class with @DefaultSchema(JavaBeanSchema.class),
>> make the fields private and generate Getters/Setters I get a StackOverflow
>> error:
>>
>> java.lang.StackOverflowError
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types.getComponentType(Types.java:197)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getComponentType(TypeToken.java:563)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.isArray(TypeToken.java:512)
>> at
>> org.apache.beam.sdk.values.TypeDescriptor.isArray(TypeDescriptor.java:191)
>> at
>> org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:195)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:191)
>> at
>> org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:143)
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>> at
>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>> at
>> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>> org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:66)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:88)
>> at
>> org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:162)
>> [...]
>>
>> 2.1 When I make the fields public, the pipeline executes, but the
>> PCollection does not have a schema associated with it, which causes the
>> next pipeline step (BigQueryIO) to fail.
>>
>> I want to try AutoValue as well, but that requires some more changes to
>> my code.
>>
>> - I tried supplying the ProtoMessageSchema().toRowFunction
>> and ProtoMessageSchema().schemaFor() for the Protobuf conversion to the
>> pipeline
>> - I tried writing my own toRow/fromRow/getSchema functions for the
>> EnrichedArticle and supplying that to the pipeline
>>
>> Where can I put the breakpoints to get a better understanding of what is
>> happening here?
>>
>>
>>
>> On Fri, Jun 26, 2020 at 5:55 PM Brian Hulette 
>> wrote:
>>
>>> Hi Tobias,
>>>
>>> You should be able to annotate the EnrichedArticle class with an4
>>> @DefaultSchema annotation and Beam will infer a schema for it. You would
>>> need to make some tweaks to the class though to be compatible with the
>>> built-in schema providers: you could make the members public and use
>>> JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
>>> it into an AutoValue and use AutoValueSchema.
>>>
>>> Once you do that you should be able to convert a
>>> PCollection to a PCollection with Convert.toRows [1].
>>>
>>> Brian
>>>
>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--
>>>
>>> On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias 
>>> wrote:
>>>
>>>> I have the following class definition:
>>>>
>>>> public class EnrichedArticle implements Serializable {
>>>>
>>>>   // ArticleEnvelope is generated via Protobuf
>>>>   private ArticleProto.ArticleEnvelope article;
>>>>   // Asset is a Java POJO
>>>>   private List assets;
>>>>
>>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>>> List assets) {
>>>> this.article = article;
>>>> this.assets = assets;
>>>>   }
>>>> }
>>>>
>>>> I am trying to generate a SerializableFunction and
>>>> a Schema for it so that I can pass it easily to my BigQueryIO at the end of
>>>> my pipeline. Transforming the article to a Row object is straightforward:
>>>>
>>>> First I get the toRow() function for it via the helper:
>>>>
>>>>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>>>>   ArticleProto.ArticleEnvelope.class));
>>>>
>>>> Then I just apply that function to the article field.
>>>> However I don't know how I can manually transform my list of assets (a
>>>> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>>>>
>>>> in my EnrichedArticle container/composition class. What's the
>>>> recommended way of doing this?
>>>>
>>>>
>>>>
>>>>


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-26 Thread Brian Hulette
Hi Tobias,

You should be able to annotate the EnrichedArticle class with an4
@DefaultSchema annotation and Beam will infer a schema for it. You would
need to make some tweaks to the class though to be compatible with the
built-in schema providers: you could make the members public and use
JavaFieldSchema, or add getters/setters and use the JavaBeanSchema, or make
it into an AutoValue and use AutoValueSchema.

Once you do that you should be able to convert a
PCollection to a PCollection with Convert.toRows [1].

Brian

[1]
https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/schemas/transforms/Convert.html#toRows--

On Fri, Jun 26, 2020 at 3:19 AM Kaymak, Tobias 
wrote:

> I have the following class definition:
>
> public class EnrichedArticle implements Serializable {
>
>   // ArticleEnvelope is generated via Protobuf
>   private ArticleProto.ArticleEnvelope article;
>   // Asset is a Java POJO
>   private List assets;
>
>   public EnrichedArticle(ArticleProto.ArticleEnvelope article, List
> assets) {
> this.article = article;
> this.assets = assets;
>   }
> }
>
> I am trying to generate a SerializableFunction and
> a Schema for it so that I can pass it easily to my BigQueryIO at the end of
> my pipeline. Transforming the article to a Row object is straightforward:
>
> First I get the toRow() function for it via the helper:
>
>  new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(
>   ArticleProto.ArticleEnvelope.class));
>
> Then I just apply that function to the article field.
> However I don't know how I can manually transform my list of assets (a
> simple Java POJO annotated with: @DefaultSchema(JavaFieldSchema.class)
>
> in my EnrichedArticle container/composition class. What's the recommended
> way of doing this?
>
>
>
>


Re: Making RPCs in Beam

2020-06-19 Thread Brian Hulette
Kenn wrote a blog post showing how to do batched RPCs with the state and
timer APIs: https://beam.apache.org/blog/timely-processing/

Is that helpful?

Brian

On Thu, Jun 18, 2020 at 5:29 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hello Everyone,
>
> In my pipeline I have to make a *single RPC call* as well as a *Batched
> RPC call* to fetch data for enrichment. I could not find any reference on
> how to make these call within your pipeline. I am still covering my grounds
> in Apache Beam and would appreciate if anyone has done this and could share
> a sample code or details on how to do this.
> --
> Thanks,
> Praveen K Viswanathan
>


Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

2020-06-10 Thread Brian Hulette
dError: BEAM-2717
>
> When I was debugging and commenting out the different steps, I noticed the
> location in my code that supposedly throws the error changes. Here it
> complains about the WriteToBigQuery step (batch_size=500) but if I comment
> out that step it just moves on to the one above. It appears it's
> consistently thrown on the last run step (don't know if that's helpful,
> just thought I'd mention it).
>
> After adding beam.typehints.disable_type_annotations() it still throws
> the same error.
>
> Another thing I forgot to mention in my first email is that I registered a
> ProtoCoder as suggested at the bottom of this page (
> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>
> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>
> Thanks again, really appreciate your help!
> Lien
>
> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette  wrote:
>
>> Hi Lien,
>>
>> > First time writing the email list, so please tell me if I'm doing this
>> all wrong.
>> Not at all! This is exactly the kind of question this list is for
>>
>> I have a couple of questions that may help us debug:
>> - Can you share the full stacktrace?
>> - What version of Beam are you using?
>>
>> There were some changes to the way we use typehints in the most recent
>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>> could you try reverting to the old behavior (call
>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>> pipeline) to see if that helps?
>>
>> Thanks,
>> Brian
>>
>> [1] https://beam.apache.org/blog/python-typing/
>>
>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels 
>> wrote:
>>
>>>
>>> Hi everyone,
>>>
>>> First time writing the email list, so please tell me if I'm doing this
>>> all wrong.
>>>
>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>
>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>> as I try to deploy to DataFlow it throws the following error:
>>>
>>> File
>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>> 221, in to_type_hint
>>> raise NotImplementedError('BEAM-2717')
>>>
>>> I've tried narrowing down what exactly could be causing the issue and it
>>> appears to be caused by the second step in my pipeline, which transforms
>>> the bytes read from PubSub to my own internal Proto format:
>>>
>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>> action_wrapper = ActionWrapper()
>>> action_wrapper.ParseFromString(x)
>>>
>>> return action_wrapper
>>>
>>> which is applied as a Map step.
>>>
>>> I've added typehints to all downstream steps as follows:
>>> def partition_by_environment(
>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>> ) -> int:
>>>
>>> I'd really appreciate it if anyone could let me know what I'm doing
>>> wrong, or what exactly is the issue this error is referring to. I read the
>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>
>>> Thanks!
>>> Kind regards,
>>> Lien
>>>
>>


[ANNOUNCE] Beam 2.22.0 Released

2020-06-10 Thread Brian Hulette
The Apache Beam team is pleased to announce the release of version 2.22.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed on
the Beam blog: https://beam.apache.org/blog/beam-2.22.0/

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.22.0.
-- Brian Hulette, on behalf of The Apache Beam team


Re: BEAM-2217 NotImplementedError - DataflowRunner parsing Protos from PubSub (Python SDK)

2020-06-08 Thread Brian Hulette
Hi Lien,

> First time writing the email list, so please tell me if I'm doing this
all wrong.
Not at all! This is exactly the kind of question this list is for

I have a couple of questions that may help us debug:
- Can you share the full stacktrace?
- What version of Beam are you using?

There were some changes to the way we use typehints in the most recent Beam
release (2.21) that might be causing this [1]. If you're using 2.21 could
you try reverting to the old behavior (call
`apache_beam.typehints.disable_type_annotations()` before constructing the
pipeline) to see if that helps?

Thanks,
Brian

[1] https://beam.apache.org/blog/python-typing/

On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels 
wrote:

>
> Hi everyone,
>
> First time writing the email list, so please tell me if I'm doing this all
> wrong.
>
> I'm building a streaming pipeline to be run on the DataflowRunner that
> reads from PubSub and writes to BQ using the Python 3 SDK.
>
> I can get the pipeline started fine with the DirectRunner, but as soon as
> I try to deploy to DataFlow it throws the following error:
>
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
> 221, in to_type_hint
> raise NotImplementedError('BEAM-2717')
>
> I've tried narrowing down what exactly could be causing the issue and it
> appears to be caused by the second step in my pipeline, which transforms
> the bytes read from PubSub to my own internal Proto format:
>
> def parse_message_blobs(x: bytes) -> ActionWrapper:
> action_wrapper = ActionWrapper()
> action_wrapper.ParseFromString(x)
>
> return action_wrapper
>
> which is applied as a Map step.
>
> I've added typehints to all downstream steps as follows:
> def partition_by_environment(
> x: ActionWrapper, num_partitions: int, environments: List[str]
> ) -> int:
>
> I'd really appreciate it if anyone could let me know what I'm doing wrong,
> or what exactly is the issue this error is referring to. I read the JIRA
> ticket, but did not understand how it is related to my issue here.
>
> Thanks!
> Kind regards,
> Lien
>


Re: Best approach for Sql queries in Beam

2020-05-21 Thread Brian Hulette
It might help if you share what exactly you are hoping to improve upon. Do
you want to avoid the separate sessionWindow transforms? Do it all with one
SQL query? Or just have it be generally more concise/readable?

I'd think it would be possible to do this with a single SQL query but I'm
not sure, maybe the windowing would mess things up. Did you try joining all
5 streams with a single query and run into a problem?

Brian

On Tue, May 19, 2020 at 9:38 AM bharghavi vajrala 
wrote:

> Hi All,
>
> Need your inputs on below scenario:
>
> Source : Kafka (Actual source is oracle db, data is pushed to kafka)
> SDK :  Java
> Runner : Flink
> Problem: Subscribe to 5 topics(tables) join with different keys, Group by
> based on few columns.
> Existing solution: Using session window of 20 seconds having different
> transform for every 2 queries and using the result.
>
> Below is the sample code:
>
> Sessions sessionWindow =
> Sessions.withGapDuration(Duration.standardSeconds((long)
> Integer.parseInt("20")));
>
>   PCollection stream1 =
> PCollectionTuple
> .of(new TupleTag<>("TABLE1"), rows1)
> .and(new TupleTag<>("TABLE2"), rows2)
> .apply("rows1-rows2", SqlTransform.query(
> "select  t1.col1,t1.col2,t2.col5 from "
> "TABLE1 t1  join TABLE2 t2 \n" +
> "on t1.col5 = t2.col7 "
> )
> )
> .apply("window" , Window.into(sessionWindow));
>
>PCollection mergedStream =
> PCollectionTuple
> .of(new TupleTag<>("MERGE-TABLE"), stream1)
> .apply("merge" , SqlTransform.query("select
> col1,col2, \n" +
> "max(case when col3='D' then col8   end)
> as D_col3,\n" +
> "max(case when col3='C' then col8   end)
> as C_col3,\n" +
> "max(case when col6='CP' then col10   end)
> as CP_col6,\n" +
> "max(case when col6='DP' then col10   end)
> as DP_col6\n" +
> "from MERGE-TABLE " +
> "group by  col1,col2\n "
> )).apply("merge-window",
> Window.into(sessionWindow));
>
>  PCollection stream2 =
> PCollectionTuple
> .of(new TupleTag<>("TABLE3"), mergedStream)
> .and(new TupleTag<>("TABLE4"), stream22)
> .apply(
>
>  SqlTransform.query("select distinct c1,c2,c4 from   " +
> "TABLE3 d1
> join TABLE4  d2\n" +
> " on  d1.num= d2.tr_num  "))
> .apply("e-window" , Window.into(sessionWindow));
>
>
> Is there any better approach?
>
> Looking forward for suggestions.
>
> Thanks!!
>
>


Re: GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'

2020-05-12 Thread Brian Hulette
Hi Eila,

It looks like you're attempting to set the option on the GoogleCloudOptions
class directly, I think you want to set it on an instance of
PipelineOptions that you've viewed as GoogleCloudOptions. Like this example
from
https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#configuring-pipelineoptions-for-execution-on-the-cloud-dataflow-service

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, staging file location, temp file location, and region.
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
...
# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Alternatively you should be able to just specify --worker_machine_type at
the command line if you're parsing the PipelineOptions from sys.argv. Does
that help?

Brian

On Tue, May 12, 2020 at 8:30 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello,
>
> I am trying to check if the setting of the resources are actually being
> implemented.
> What will be the right way to do it.
> *the code is:*
> GoogleCloudOptions.worker_machine_type = 'n1-highcpu-96'
>
> and *the dataflow view is *the following (nothing that reflects
> the highcpu machine.
> Please advice
>
> Thanks,
> Eila
> Resource metrics
> Current vCPUs
>
> 1
>
> Total vCPU time
>
> 0.07 vCPU hr
>
> Current memory
>
> 3.75 GB
>
> Total memory time
>
> 0.264 GB hr
>
> Current PD
>
> 250 GB
>
> Total PD time
>
> 17.632 GB hr
>
> Current SSD PD
>
> 0 B
>
> Total SSD PD time
>
> 0 GB hr
>
>
> --
> Eila
> 
> Meetup 
>


Re: Upgrading from 2.15 to 2.19 makes compilation fail on trigger

2020-05-07 Thread Brian Hulette
I filed https://github.com/apache/beam/pull/11622 to add a "did you mean"
to the error message. Also updated the tl;dr in
https://s.apache.org/finishing-triggers-drop-data so people using currently
released SDKs will get to the fix sooner when they visit that link:

TL;DR: Almost all uses of triggers that "finish" are data loss bugs. These
triggers have been disabled since Beam 2.18. Most Beam users will want to
wrap affected triggers in Repeatedly.forever(...).

On Tue, May 5, 2020 at 9:44 AM Kyle Weaver  wrote:

> > Maybe we should add a statement like "did you mean to wrap it in
> Repeatedly.forever?" to the error message
>
> +1. IMO the less indirection between the user and the fix, the better.
>
> On Tue, May 5, 2020 at 12:08 PM Luke Cwik  wrote:
>
>> Pointing users to the website with additional details in the error
>> messages would likely help as well.
>>
>> On Tue, May 5, 2020 at 8:45 AM Brian Hulette  wrote:
>>
>>> In both SDKs this is an unsafe trigger because it will only fire once
>>> for the first window (per key), and any subsequent data on the same key
>>> will be dropped. In 2.18, we closed BEAM-3288 with PR
>>> https://github.com/apache/beam/pull/9960, which detects these cases and
>>> fails early. Probably the fix is to add Repeatedly.forever around your
>>> AfterWatermark trigger.
>>>
>>> This is noted if you read through
>>> https://s.apache.org/finishing-triggers-drop-data but it's not super
>>> clear from a user perspective. Maybe we should add a statement like "did
>>> you mean to wrap it in Repeatedly.forever?" to the error message, and/or
>>> update https://s.apache.org/finishing-triggers-drop-data with clear
>>> directions for users. +Kenneth Knowles 
>>>
>>> On Tue, May 5, 2020 at 5:18 AM Eddy G  wrote:
>>>
>>>> Hey all!
>>>>
>>>> Recently been updating Beam pipelines up to 2.19, and the following
>>>> trigger which previously worked with 2.15 flawlessly has stopped doing so
>>>> and the project doesn't even compile now.
>>>>
>>>> .apply("15min Window",
>>>> Window.into(FixedWindows.of(Duration.standardMinutes(15)))
>>>> .triggering(AfterWatermark
>>>> .pastEndOfWindow())
>>>> .withAllowedLateness(Duration.standardMinutes(60))
>>>> .discardingFiredPanes()
>>>> )
>>>>
>>>> And will complain with the following error.
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Unsafe
>>>> trigger may lose data, see
>>>> https://s.apache.org/finishing-triggers-drop-data:
>>>> AfterWatermark.pastEndOfWindow()
>>>>
>>>> Reviewing the changelog I don't see any changes regarding
>>>> AfterWatermark. Am I missing something?
>>>>
>>>


Re: Upgrading from 2.15 to 2.19 makes compilation fail on trigger

2020-05-05 Thread Brian Hulette
In both SDKs this is an unsafe trigger because it will only fire once for
the first window (per key), and any subsequent data on the same key will be
dropped. In 2.18, we closed BEAM-3288 with PR
https://github.com/apache/beam/pull/9960, which detects these cases and
fails early. Probably the fix is to add Repeatedly.forever around your
AfterWatermark trigger.

This is noted if you read through
https://s.apache.org/finishing-triggers-drop-data but it's not super clear
from a user perspective. Maybe we should add a statement like "did you mean
to wrap it in Repeatedly.forever?" to the error message, and/or update
https://s.apache.org/finishing-triggers-drop-data with clear directions for
users. +Kenneth Knowles 

On Tue, May 5, 2020 at 5:18 AM Eddy G  wrote:

> Hey all!
>
> Recently been updating Beam pipelines up to 2.19, and the following
> trigger which previously worked with 2.15 flawlessly has stopped doing so
> and the project doesn't even compile now.
>
> .apply("15min Window",
> Window.into(FixedWindows.of(Duration.standardMinutes(15)))
> .triggering(AfterWatermark
> .pastEndOfWindow())
> .withAllowedLateness(Duration.standardMinutes(60))
> .discardingFiredPanes()
> )
>
> And will complain with the following error.
>
> Exception in thread "main" java.lang.IllegalArgumentException: Unsafe
> trigger may lose data, see
> https://s.apache.org/finishing-triggers-drop-data:
> AfterWatermark.pastEndOfWindow()
>
> Reviewing the changelog I don't see any changes regarding AfterWatermark.
> Am I missing something?
>