Re: Apache Beam a Complete Guide - Review?

2020-06-29 Thread Luke Cwik
The author for Apache Beam A Complete Guide does not have good reviews on
Amazon for their other books and as you mentioned no reviews for this one.

I would second the Streaming Systems book as the authors directly worked on
Apache Beam.

On Sun, Jun 28, 2020 at 6:46 PM Wesley Peng  wrote:

> Hi Rion
>
> Rion Williams wrote:
> > I considered that one as well but was in the same boat in terms of not
> > pulling the trigger (lack of reviews, price point, etc.). I eventually
> > landed on Streaming Systems, which I highly, highly recommend if you
> > want to learn more about the Beam model:
> >
> > - http://streamingsystems.net/
> >
> > I don’t think there’s a better book that I’ve come across that focuses
> > on it more (and if there is one, I’d love to know about it). I wrote a
> > blog post that includes a short-review of it if you want a slightly
> > longer summary (http://rion.io/2020/05/09/an-education-in-streaming/),
> > but again - I highly recommend checking it out if you hadn’t already.
>
> Thanks for the answer. I will check the resource you gave above.
>
> Regards.
>


Re: DoFn with SideInput

2020-06-29 Thread Luke Cwik
The UpdateFn won't be invoked till the side input is ready which requires
either the watermark to pass the end of the global window + allowed
lateness (to show that the side input is empty) or at least one firing to
populate it with data. See this general section on side inputs[1] and some
useful patterns[2] (there are some examples for how to get globally
windowed side inputs to work).

1: https://beam.apache.org/documentation/programming-guide/#side-inputs
2: https://beam.apache.org/documentation/patterns/side-inputs/

On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

>
> Hi All - I am facing an issue while using *side-input*.
>
> *What am I doing:*
> From my main program, I am calling a custom PTransform with a
> PCollectionView as parameter. Inside custom PTransform, I am passing the
> PCollectionView as a side-input to a DoFn.
>
> *Issue:*
> When I run the pipeline, I am expecting the log statement inside my DoFn's
> processElement to get executed but it is not getting logged. If I remove
> the side-input to my DoFn then the log is getting printed. I am suspecting
> whether it could be related to windowing/execution order or my side-input
> somehow being empty. Appreciate if you can clarify on what is going wrong
> here.
>
> *Code Structure:*
>
>
> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>
>  // Get two tuple tags from first transformation
>  PCollection1 = tuple.get(tag1).setCoder(...);
>  PCollection2 = tuple.get(tag2).setCoder(...);
>
>  // Converting PCollection1 to PCollectionView to use as a side-input
>  // Note: I need to introduce a global window here as my source is
> unbounded and when we use View.asList() it does GroupByKey internally
>   which inturn demands a window
>  PView = PCollection1.apply(Window.>into(new
> GlobalWindows()) // Everything into global window.
>
>  .triggering(Repeatedly.forever(DefaultTrigger.of()))
>
>  .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>
> // Pass PCollectionView to SecondTx as a param
> PCollection3 = PCollection2.apply(new SecondTx(PView));
>
> *SecondTx:*
> Inside my SecondTx, I am getting the PView from constructor (this.PView =
> PView) and calling a DoFn
>
> public PCollection expand(PCollection  >> input) {
> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
> ...
> }
>
> // DoFn
> class UpdateFn extends DoFn>>,
> CustomObject> {
> @ProcessElement
> public void processElement(@Element Map Map>> input, OutputReceiver out) {
>* Log.of("UpdateFn " + input);*
> out.output(new CustomObject());
> }
> }
>
> --
> Thanks,
> Praveen K Viswanathan
>


KafkaIO does not support add or remove topics

2020-06-29 Thread Talat Uyarer
Hi,

I am using Dataflow. When I update my DF job with a new topic or update
partition count on the same topic. I can not use DF's update function.
Could you help me to understand why I can not use the update function for
that ?

I checked the Beam source code but I could not find the right place to read
on the code.

Thanks for your help in advance
Talat


Re: Can SpannerIO read data from different GCP project?

2020-06-29 Thread Luke Cwik
The intent is that you grant permissions to the account that is running the
Dataflow job to the resources you want it to access in project B before you
start the pipeline. This allows for much finer grain access control and the
ability to revoke permissions without having to disable an entire account.

I would take a look at the general IAM and security documentation within
GCP[1] or open up a support case with GCP requesting guidance.

1: https://cloud.google.com/iam

On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett 
wrote:

> I havent tried yet, but looks like the connection string asks for the
> project to be specified.  Based on that (and cross project working for
> other circumstances), I would imagine it will work, but...?  Give it a try!
>
> One tricky place might be ensuring proper permissions, in both projects
> (and without being too open).
>
> On Sat, Jun 27, 2020, 5:46 AM Sheng Yang  wrote:
>
>> Hi,
>>
>> I am working on Beam using Dataflow engine. Recently I am working on
>> reading spanner data from different project. Say I run my Beam dataflow job
>> in GCP project A, but the Spanner is in GCP project B. I searched all the
>> documents, but can't find any documentation about SpannerIO reading data
>> with the custom credential key files. Right now I am considering JdbcIO
>> because it accepts custom credential as parameters and spanner also have
>> jdbc api[1].
>> Do I have something wrong in my description? Or am I considering the
>> correct approach?
>>
>> String url = "jdbc:cloudspanner:/projects/my_project_id/"
>>
>>+ "instances/my_instance_id/"
>>+ "databases/my_database_name"
>>+ "?credentials=/home/cloudspanner-keys/my-key.json"
>>+ ";autocommit=false";try (Connection connection = 
>> DriverManager.getConnection(url)) {
>>   try(ResultSet rs = connection.createStatement()
>>.executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM 
>> Albums")) {
>> while(rs.next()) {
>>   Long singerId = rs.getLong(1);
>> }
>>   }
>> }
>>
>>
>> [1]: https://github.com/googleapis/java-spanner-jdbc
>>
>> Thanks,
>> Sheng
>>
>>
>>
>>
>


Re: KafkaIO does not support add or remove topics

2020-06-29 Thread Alexey Romanenko
Yes, it’s a known limitation [1] mostly due to the fact that KafkaIO.Read is 
based on UnboundedSource API and it fetches all information about topic and 
partitions only once during a “split" phase [2]. There is on-going work to make 
KafkaIO.Read based on Splittable DoFn [3] which should allow to get 
topic/partitions information dynamically, without restarting a pipeline.

[1] https://issues.apache.org/jira/browse/BEAM-727
[2] 
https://github.com/apache/beam/blob/8a54c17235f768f089b36265d79e69ee9b27a2ce/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
[3] https://issues.apache.org/jira/browse/BEAM-9977 



> On 29 Jun 2020, at 18:14, Talat Uyarer  wrote:
> 
> Hi,
> 
> I am using Dataflow. When I update my DF job with a new topic or update 
> partition count on the same topic. I can not use DF's update function. Could 
> you help me to understand why I can not use the update function for that ? 
> 
> I checked the Beam source code but I could not find the right place to read 
> on the code. 
> 
> Thanks for your help in advance
> Talat



Re: Concurrency issue with KafkaIO

2020-06-29 Thread Alexey Romanenko
I don’t think it’s a known issue. Could you tell with version of Beam you use?

> On 28 Jun 2020, at 14:43, wang Wu  wrote:
> 
> Hi,
> We run Beam pipeline on Spark in the streaming mode. We subscribe to multiple 
> Kafka topics. Our job run fine until it is under heavy load: millions of 
> Kafka messages coming per seconds. The exception look like concurrency issue. 
> Is it a known bug in Beam or some Spark configuration we could do to avoid?
> Our code roughly look like this
> For KafkaIO
> input
>.getPipeline()
>.apply(
>"ReadFromKafka",
>KafkaIO.readBytes()
>.withBootstrapServers(XXX)
>.withTopics(YYY)
>.withConsumerConfigUpdates(
>ImmutableMap.of(
>"group.id",
>"ZZZ"))
>.withReadCommitted()
>.commitOffsetsInFinalize())
>.apply(
>"AAA",
>ParDo.of(
>KafkaRecordToFeatureRowDoFn.newBuilder()
>.setSuccessTag(getSuccessTag())
>.setFailureTag(getFailureTag())
>.build())
>.withOutputTags(getSuccessTag(), 
> TupleTagList.of(getFailureTag(;
> Running Pipeline with Spark Runner:
> PipelineResult result = pipeline.run();
> result.waitUntilFinish();
> 
> --
> Exceptions:
> 
> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already 
> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as it 
> was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as 
> it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as 
> it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as 
> it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as 
> it was not found on disk or in memory
> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to 
> exception java.lang.IllegalStateException.
> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as 
> it was not found on disk or in memory
> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 
> 203)
> java.lang.IllegalStateException
>   at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>   at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>   at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>   at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
>   at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
>   at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
>   at 
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>   at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>   at 
> org.apache.spark

Re: KafkaIO does not support add or remove topics

2020-06-29 Thread Talat Uyarer
Thanks Alexey for sharing tickets and code. I found one workaround to use
the update function. If I generate different KafkaIO step name for each
submission and
provide 
--transformNameMapping="{"Kafka_IO_3242/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds":""}"
My update command is successfully. I have a question about it. Is there any
way to prevent data loss ? I believe when I provide
that transformNameMapping, DF destroys previous KafakIO with its state.
Does .commitOffsetsInFinalize()  help me to prevent data loss ? I am ok
with small data duplication.

Thanks

On Mon, Jun 29, 2020 at 10:17 AM Alexey Romanenko 
wrote:

> Yes, it’s a known limitation [1] mostly due to the fact that KafkaIO.Read
> is based on UnboundedSource API and it fetches all information about topic
> and partitions only once during a “split" phase [2]. There is on-going work
> to make KafkaIO.Read based on Splittable DoFn [3] which should allow to get
> topic/partitions information dynamically, without restarting a pipeline.
>
> [1] https://issues.apache.org/jira/browse/BEAM-727
> 
> [2]
> https://github.com/apache/beam/blob/8a54c17235f768f089b36265d79e69ee9b27a2ce/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
> 
> [3] https://issues.apache.org/jira/browse/BEAM-9977
> 
>
>
> On 29 Jun 2020, at 18:14, Talat Uyarer 
> wrote:
>
> Hi,
>
> I am using Dataflow. When I update my DF job with a new topic or update
> partition count on the same topic. I can not use DF's update function.
> Could you help me to understand why I can not use the update function for
> that ?
>
> I checked the Beam source code but I could not find the right place to
> read on the code.
>
> Thanks for your help in advance
> Talat
>
>
>


Re: Concurrency issue with KafkaIO

2020-06-29 Thread wang Wu
Hi,

We are using version 2.16.0. More about our dependencies:

+- org.apache.beam:beam-sdks-java-core:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-model-job-management:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-vendor-bytebuddy-1_9_3:jar:0.1:compile
[INFO] |  \- org.tukaani:xz:jar:1.8:compile
[INFO] +- org.apache.beam:beam-sdks-java-io-kafka:jar:2.16.0:compile
[INFO] |  \- org.springframework:spring-expression:jar:5.0.13.RELEASE:compile
[INFO] | \- org.springframework:spring-core:jar:5.0.13.RELEASE:compile
[INFO] |\- org.springframework:spring-jcl:jar:5.0.13.RELEASE:compile

+- org.apache.beam:beam-runners-spark:jar:2.16.0:compile
[INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.16.0:compile
[INFO] |  |  +- org.apache.beam:beam-model-fn-execution:jar:2.16.0:compile
[INFO] |  |  \- org.apache.beam:beam-sdks-java-fn-execution:jar:2.16.0:compile
[INFO] |  \- org.apache.beam:beam-runners-java-fn-execution:jar:2.16.0:compile
[INFO] | \- 
org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.16.0:compile

When building Kafka input source, we subscribe to multiple topics, i.e. 
.withTopics(list of multiple topics). At a second thought, I think that it 
might be the memory issue. Before the job get killed, I think at some point I 
see Spark log saying that the executor is out of memory. Let me observe more 
and get back.

Regards
Din 

> On 30 Jun BE 2563, at 00:26, Alexey Romanenko  
> wrote:
> 
> I don’t think it’s a known issue. Could you tell with version of Beam you use?
> 
>> On 28 Jun 2020, at 14:43, wang Wu  wrote:
>> 
>> Hi,
>> We run Beam pipeline on Spark in the streaming mode. We subscribe to 
>> multiple Kafka topics. Our job run fine until it is under heavy load: 
>> millions of Kafka messages coming per seconds. The exception look like 
>> concurrency issue. Is it a known bug in Beam or some Spark configuration we 
>> could do to avoid?
>> Our code roughly look like this
>> For KafkaIO
>> input
>>   .getPipeline()
>>   .apply(
>>   "ReadFromKafka",
>>   KafkaIO.readBytes()
>>   .withBootstrapServers(XXX)
>>   .withTopics(YYY)
>>   .withConsumerConfigUpdates(
>>   ImmutableMap.of(
>>   "group.id",
>>   "ZZZ"))
>>   .withReadCommitted()
>>   .commitOffsetsInFinalize())
>>   .apply(
>>   "AAA",
>>   ParDo.of(
>>   KafkaRecordToFeatureRowDoFn.newBuilder()
>>   .setSuccessTag(getSuccessTag())
>>   .setFailureTag(getFailureTag())
>>   .build())
>>   .withOutputTags(getSuccessTag(), 
>> TupleTagList.of(getFailureTag(;
>> Running Pipeline with Spark Runner:
>> PipelineResult result = pipeline.run();
>> result.waitUntilFinish();
>> 
>> --
>> Exceptions:
>> 
>> 20/06/26 18:45:28 WARN KafkaUnboundedSource: Reader-3: ignoring already 
>> consumed offset 495100013 for gmb-featurestore-FeatureRow-global___HK-1
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_7_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_7_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_47_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_47_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_87_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_87_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_93_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_93_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_98_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_98_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 WARN BlockManager: Putting block rdd_102_3 failed due to 
>> exception java.lang.IllegalStateException.
>> 20/06/26 18:45:28 WARN BlockManager: Block rdd_102_3 could not be removed as 
>> it was not found on disk or in memory
>> 20/06/26 18:45:28 ERROR Executor: Exception in task 3.0 in stage 64.0 (TID 
>> 203)
>> java.lang.IllegalStateException
>>  at java.util.ArrayList$Itr.remove(ArrayList.java:872)
>>  at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.remove(Iterators.java:423)
>>  at 
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:172)
>>  at 
>> org.apache.beam.runners.spark.io.MicrobatchSourc

Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-29 Thread Brian Hulette
Hm it looks like the error is from trying to call the zero-arg constructor
for the ArticleEnvelope proto class. Do you have a schema registered for
ArticleEnvelope?

I think maybe what's happening is Beam finds there's no schema registered
for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
generates code that attempts to use the zero-arg constructor. It looks like
that's a bug in JavaFieldSchema, we should fail earlier with a better
message rather than just generating code that will try to access a private
constructor, I filed a jira for this [1].

I think you can get this working if you register a Schema for
ArticleEnvelope. I'm not actually sure of the best way to do this since
it's generated code and you can't use @DefaultSchema (+Reuven Lax
  and +Alex Van Boxel   in case they
have better advice), you might try just registering a provider manually
when you create the pipeline, something like
`pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
new ProtoMessageSchema())`.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-10372

On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias 
wrote:

> A bit more context - I started with the Beam documentation and
> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
> dug deeper and tried to implement the methods myself.
>
> What I also tried is the following class definition:
>
> @DefaultSchema(JavaFieldSchema.class)
> public class EnrichedArticle implements Serializable {
>
>   // ArticleEnvelope is generated from Protobuf
>   @Nullable public ArticleProto.ArticleEnvelope article;
>   // Asset is a Java POJO
>   @Nullable public List assets;
>
>   @SchemaCreate
>   public EnrichedArticle() {}
>
>   @SchemaCreate
>   public EnrichedArticle(ArticleProto.ArticleEnvelope article, List
> assets) {
> this.article = article;
> this.assets = assets;
>   }
> }
>
> This throws the following exception:
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
> ...
> Caused by: java.lang.IllegalAccessError: tried to access method
> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
> from class
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
> at
> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
> Source)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
> at
> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$EnrichFn.processElement(ArticlesKafkaToBigQuery.java:439)
>
>
> On Sat, Jun 27, 2020 at 11:09 AM Kaymak, Tobias 
> wrote:
>
>> Hi Brian,
>>
>> Thank you for your response.
>>
>> 1. When I annotate the class with @DefaultSchema(JavaFieldSchema.class)
>> and my constructor with a @SchemaCreate ,I get the following exception:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.d

Re: DoFn with SideInput

2020-06-29 Thread Praveen K Viswanathan
Thank you Luke. I changed DefaultTrigger.of() to AfterProcessingTime.
pastFirstElementInPane() and it worked.

On Mon, Jun 29, 2020 at 9:09 AM Luke Cwik  wrote:

> The UpdateFn won't be invoked till the side input is ready which requires
> either the watermark to pass the end of the global window + allowed
> lateness (to show that the side input is empty) or at least one firing to
> populate it with data. See this general section on side inputs[1] and some
> useful patterns[2] (there are some examples for how to get globally
> windowed side inputs to work).
>
> 1: https://beam.apache.org/documentation/programming-guide/#side-inputs
> 2: https://beam.apache.org/documentation/patterns/side-inputs/
>
> On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>>
>> Hi All - I am facing an issue while using *side-input*.
>>
>> *What am I doing:*
>> From my main program, I am calling a custom PTransform with a
>> PCollectionView as parameter. Inside custom PTransform, I am passing the
>> PCollectionView as a side-input to a DoFn.
>>
>> *Issue:*
>> When I run the pipeline, I am expecting the log statement inside my
>> DoFn's processElement to get executed but it is not getting logged. If I
>> remove the side-input to my DoFn then the log is getting printed. I am
>> suspecting whether it could be related to windowing/execution order or my
>> side-input somehow being empty. Appreciate if you can clarify on what is
>> going wrong here.
>>
>> *Code Structure:*
>>
>>
>> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>>
>>  // Get two tuple tags from first transformation
>>  PCollection1 = tuple.get(tag1).setCoder(...);
>>  PCollection2 = tuple.get(tag2).setCoder(...);
>>
>>  // Converting PCollection1 to PCollectionView to use as a side-input
>>  // Note: I need to introduce a global window here as my source is
>> unbounded and when we use View.asList() it does GroupByKey internally
>>   which inturn demands a window
>>  PView = PCollection1.apply(Window.>into(new
>> GlobalWindows()) // Everything into global window.
>>
>>  .triggering(Repeatedly.forever(DefaultTrigger.of()))
>>
>>  .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>>
>> // Pass PCollectionView to SecondTx as a param
>> PCollection3 = PCollection2.apply(new SecondTx(PView));
>>
>> *SecondTx:*
>> Inside my SecondTx, I am getting the PView from constructor (this.PView =
>> PView) and calling a DoFn
>>
>> public PCollection expand(PCollection > >> input) {
>> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
>> ...
>> }
>>
>> // DoFn
>> class UpdateFn extends DoFn> String>>>, CustomObject> {
>> @ProcessElement
>> public void processElement(@Element Map> Map>> input, OutputReceiver out) {
>>* Log.of("UpdateFn " + input);*
>> out.output(new CustomObject());
>> }
>> }
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-29 Thread Brian Hulette
It just occurred to me that BEAM-10265 [1] could be the cause of the stack
overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas
are not allowed to be recursive, and it looks like we don't fail gracefully
for recursive proto definitions.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-10265

On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette  wrote:

> Hm it looks like the error is from trying to call the zero-arg constructor
> for the ArticleEnvelope proto class. Do you have a schema registered for
> ArticleEnvelope?
>
> I think maybe what's happening is Beam finds there's no schema registered
> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
> generates code that attempts to use the zero-arg constructor. It looks like
> that's a bug in JavaFieldSchema, we should fail earlier with a better
> message rather than just generating code that will try to access a private
> constructor, I filed a jira for this [1].
>
> I think you can get this working if you register a Schema for
> ArticleEnvelope. I'm not actually sure of the best way to do this since
> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>   and +Alex Van Boxel   in case they
> have better advice), you might try just registering a provider manually
> when you create the pipeline, something like
> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
> new ProtoMessageSchema())`.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10372
>
> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias 
> wrote:
>
>> A bit more context - I started with the Beam documentation and
>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>> dug deeper and tried to implement the methods myself.
>>
>> What I also tried is the following class definition:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>> public class EnrichedArticle implements Serializable {
>>
>>   // ArticleEnvelope is generated from Protobuf
>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>   // Asset is a Java POJO
>>   @Nullable public List assets;
>>
>>   @SchemaCreate
>>   public EnrichedArticle() {}
>>
>>   @SchemaCreate
>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>> List assets) {
>> this.article = article;
>> this.assets = assets;
>>   }
>> }
>>
>> This throws the following exception:
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> ...
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>> at
>> ch.ricardo.di.beam.ArticlesKafkaToBigQuery$Enric

Re: Apache Beam a Complete Guide - Review?

2020-06-29 Thread Wesley Peng

Hi Luke

Luke Cwik wrote:
The author for Apache Beam A Complete Guide does not have good reviews 
on Amazon for their other books and as you mentioned no reviews for this 
one.


I would second the Streaming Systems book as the authors directly worked 
on Apache Beam.



So, can Apache beam team in google write a book directly for the project 
users?


Thanks.


Re: Apache Beam a Complete Guide - Review?

2020-06-29 Thread Rion Williams
Hi Wesley,

In essence, yes, that’s what they did.

The three authors of Streaming Systems are folks that work on Google’s Dataflow 
Project, which for all intents and purposes is essentially an implementation of 
the Beam Model. Two of them are members of the Beam PMC (essentially a steering 
committee for the project) and you’ll frequently see them if you are active 
within the mailing lists here.

As mentioned earlier in the thread, I can’t recommend Streaming Systems enough. 
If you are curious about Beam or want to dig into it, it’s the best book you’ll 
find on the topic. 

> On Jun 29, 2020, at 8:50 PM, Wesley Peng  wrote:
> 
> Hi Luke
> 
> Luke Cwik wrote:
>> The author for Apache Beam A Complete Guide does not have good reviews on 
>> Amazon for their other books and as you mentioned no reviews for this one.
>> I would second the Streaming Systems book as the authors directly worked on 
>> Apache Beam.
> 
> 
> So, can Apache beam team in google write a book directly for the project 
> users?
> 
> Thanks.


Re: Apache Beam a Complete Guide - Review?

2020-06-29 Thread Wesley Peng




Rion Williams wrote:

The three authors of Streaming Systems are folks that work on Google’s Dataflow 
Project, which for all intents and purposes is essentially an implementation of 
the Beam Model. Two of them are members of the Beam PMC (essentially a steering 
committee for the project) and you’ll frequently see them if you are active 
within the mailing lists here.

As mentioned earlier in the thread, I can’t recommend Streaming Systems enough. 
If you are curious about Beam or want to dig into it, it’s the best book you’ll 
find on the topic.


Thanks for this info Rion.


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-06-29 Thread Luke Cwik
Can you give context as to whether schemas will ever allow recursive types
since this is pretty common in lots of languages?

On Mon, Jun 29, 2020 at 5:13 PM Brian Hulette  wrote:

> It just occurred to me that BEAM-10265 [1] could be the cause of the stack
> overflow. Does ArticleEnvelope refer to itself recursively? Beam schemas
> are not allowed to be recursive, and it looks like we don't fail gracefully
> for recursive proto definitions.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10265
>
> On Mon, Jun 29, 2020 at 11:03 AM Brian Hulette 
> wrote:
>
>> Hm it looks like the error is from trying to call the zero-arg
>> constructor for the ArticleEnvelope proto class. Do you have a schema
>> registered for ArticleEnvelope?
>>
>> I think maybe what's happening is Beam finds there's no schema registered
>> for ArticleEnvelope, so it just recursively applies JavaFieldSchema, which
>> generates code that attempts to use the zero-arg constructor. It looks like
>> that's a bug in JavaFieldSchema, we should fail earlier with a better
>> message rather than just generating code that will try to access a private
>> constructor, I filed a jira for this [1].
>>
>> I think you can get this working if you register a Schema for
>> ArticleEnvelope. I'm not actually sure of the best way to do this since
>> it's generated code and you can't use @DefaultSchema (+Reuven Lax
>>   and +Alex Van Boxel   in case they
>> have better advice), you might try just registering a provider manually
>> when you create the pipeline, something like
>> `pipeline.getSchemaRegistry().registerSchemaProvider(ArticleEnvelope.class,
>> new ProtoMessageSchema())`.
>>
>> Brian
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10372
>>
>> On Sat, Jun 27, 2020 at 2:44 AM Kaymak, Tobias 
>> wrote:
>>
>>> A bit more context - I started with the Beam documentation and
>>> tried JavaFieldSchema and JavaBeanSchema first, when that didn't work, I
>>> dug deeper and tried to implement the methods myself.
>>>
>>> What I also tried is the following class definition:
>>>
>>> @DefaultSchema(JavaFieldSchema.class)
>>> public class EnrichedArticle implements Serializable {
>>>
>>>   // ArticleEnvelope is generated from Protobuf
>>>   @Nullable public ArticleProto.ArticleEnvelope article;
>>>   // Asset is a Java POJO
>>>   @Nullable public List assets;
>>>
>>>   @SchemaCreate
>>>   public EnrichedArticle() {}
>>>
>>>   @SchemaCreate
>>>   public EnrichedArticle(ArticleProto.ArticleEnvelope article,
>>> List assets) {
>>> this.article = article;
>>> this.assets = assets;
>>>   }
>>> }
>>>
>>> This throws the following exception:
>>>
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.IllegalAccessError: tried to access method
>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>>> from class
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>> ...
>>> Caused by: java.lang.IllegalAccessError: tried to access method
>>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>>> from class
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1
>>> at
>>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$9lEH2bA1.create(Unknown
>>> Source)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>>> at
>>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
>>> at
>>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>>> at
>>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>>> at
>>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>> at
>>> org.apach

Re:Re: Can SpannerIO read data from different GCP project?

2020-06-29 Thread Sheng Yang
Thanks Austin, Luke replying my message:


I did some experiments,  these are my code snippets. 


Manen:
2.21.0



com.google.cloud
google-cloud-spanner-jdbc
1.15.0




com.google.cloud
google-cloud-spanner
1.56.0



Java code:
public class SpannerJdbcToCsvText {


  private static final Logger LOG = 
LoggerFactory.getLogger(SpannerJdbcToCsvText.class);


  public interface SpannerToTextOptions
  extends PipelineOptions,
  SpannerReadOptions,
  JavascriptTextTransformerOptions,
  FilesystemWriteOptions {


  }


  public static void main(String[] args) {
LOG.info("Starting pipeline setup");
PipelineOptionsFactory.register(SpannerToTextOptions.class);
SpannerToTextOptions options =

PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);


FileSystems.setDefaultPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);

// - block 1 start-
// block 1 will print out in logs in my local mac desktop.
List list = new ArrayList<>();
try {
  String projectId = "projectId";
  String instanceId = "instanceId";
  String databaseId = "databaseId";


  String credentialsFile = "/my/mac/local/path/credentials.json";


  try (Connection connection =
   DriverManager.getConnection(
   String.format(
   
"jdbc:cloudspanner:/projects/%s/instances/%s/databases/%s?credentials=%s",
   projectId, instanceId, databaseId, 
credentialsFile))) {
try (Statement statement = connection.createStatement()) {
  try (ResultSet rs = statement.executeQuery("SELECT name from t2")) {
while (rs.next()) {
  list.add(rs.getString(1));
  LOG.info("print outside get value: " + rs.getString(1));
}
  }
}
  }
}
catch(Exception e) {
  LOG.error("", e);
}


PCollection results = 
pipeline.apply(Create.of(list).withType(TypeDescriptor.of(String.class))).setCoder(StringUtf8Coder.of());
// - block 1 end-


// - block 2 start-
// block 2 will print in server logs
results.apply("print value", ParDo.of(new MapFn()));
// - block 2 end-


pipeline.run();
LOG.info("Completed pipeline setup");
  }
}




I ran the program like this:
 mvn compile exec:java \
 -Dexec.mainClass=com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText \
 -Dexec.args="--runner=DataflowRunner \
 --region=us-central1"


There logs printed out in my local console:
org.apache.beam.runners.dataflow.DataflowRunner  - PipelineOptions.filesToStage 
was not specified. Defaulting tofiles from the classpath: will stage 351 files. 
Enable logging at DEBUG level to see which files will be staged.
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText  - print outside get value: 
myname
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText  - print outside get value: 2
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText  - print outside get value: 3
com.stubhub.de.dataflow.batch.SpannerJdbcToCsvText  - print outside get value: 4
org.apache.beam.runners.dataflow.DataflowRunner  - Executing pipeline on the 
Dataflow Service, which will have billing implications related to Google 
Compute Engine usage and other Google Cloud Services.
org.apache.beam.runners.dataflow.util.PackageUtil  - Uploading 351 files from 
PipelineOptions.filesToStage to staging location to prepare for execution.
org.apache.beam.runners.dataflow.util.PackageUtil  - Uploading 
/Users/shengyang/ws/Stubhub-DataPlatform.dataworks-ingestion/target/classes to 
gs://dataflow-staging-us-central1-661544897337/temp/staging/classes-KrjSD-Y0s4i28kG-XmiBiw.jar


There logs printed in gcp servers


2020-06-30 09:44:57.483 HKT
Finished processing stage F0 with 0 errors in 0.28 seconds
2020-06-30 09:44:59.600 HKT
Starting MapTask stage s01
2020-06-30 09:45:00.916 HKT
in mapfn - get value:myname
2020-06-30 09:45:00.934 HKT
Finished processing stage s01 with 0 errors in 1.333 seconds
2020-06-30 09:45:03.025 HKT
Starting MapTask stage s01
2020-06-30 09:45:03.046 HKT
in mapfn - get value:4
2020-06-30 09:45:03.047 HKT
Finished processing stage s01 with 0 errors in 0.022 seconds
2020-06-30 09:45:05.148 HKT
Starting MapTask stage s01
2020-06-30 09:45:05.166 HKT
in mapfn - get value:2
2020-06-30 09:45:05.176 HKT
Finished processing stage s01 with 0 errors in 0.028 seconds


Why Spanner JDBC call happens (in block 1) in my local machine during compile 
phase? while MapFn (in block 2) happens in server side, I expect all of them 
happen in server side. 









At 2020-06-30 00:17:51, "Luke Cwik"  wrote:

The intent is that you grant permissions to the account that is running the 
Dataflow job to the resources you want it to access in project B before you 
start the pipeline. Thi