Re: (python) Using ToList output as input for AsSingleton or AsList in Apache Beam

2019-04-17 Thread Heejong Lee
Looks like your code is not grammatically correct. It's impossible to chain
pcollection and pcollection with a vertical bar. Each chain should start
with an initial pipeline or pcollection and generates another pcollection
via a given ptransform.

If you modify your code like below, it should work:

import apache_beam as beam

def m(x, u):
print(u)
return x

p = beam.Pipeline()

data_beam = beam.Create(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'])

chain_1 = p | data_beam | beam.combiners.Count.PerElement()
chain_2 = chain_1 | beam.Map(lambda x: x[0]) | beam.combiners.ToList()

chain_total = chain_1 | beam.Map(m, beam.pvalue.AsSingleton(chain_2))

chain_total | beam.Map(print)

p.run()


Thanks,

On Tue, Apr 16, 2019 at 12:28 PM Paul, Austin 
wrote:

> I encountered this error when writing my own Beam pipeline, and was
> frustrated at the lack of documentation/support as to what was wrong.
> Here's what I think the issue is (apologies if it is totally misguided):
>
> I find it helpful to think of PCollections as nouns and Transforms as
> verbs. Hopefully this adds clarity, but if it doesn't feel free to
> substitute the proper jargon in the explanation below.
> Applying a verb (or multiple verbs) to a noun will yield another noun.
> Applying verbs to verbs will just get you another verb.
> The only noun you start out with is p, your initial Pipeline object.
> You can use this p with verbs to create other nouns, as you do with
> chain1, which is a noun.
> chain2 on the otherhand is a verb applied to a verb, so it's just a verb.
> Since there's no actual content there, it can't be turned into a side
> input, since only nouns can do that.
>
> Chain_total is a noun. Did you mean to pass that instead?
>
> Best,
> Austin
>
>
>
> -Original Message-
> From: bruno.filipe.silva.dias@james.finance
> 
> Sent: Tuesday, April 16, 2019 12:36 PM
> To: user@beam.apache.org
> Subject: Re: (python) Using ToList output as input for AsSingleton or
> AsList in Apache Beam
>
>
>
> On 2019/04/16 16:32:46, Bruno Dias 
> wrote:
> > I'm getting an unexpected error when I try to use the output of
> > beam.combiners.ToList as the input of beam.pvalue.AsSingleton or
> > beam.pvalue.AsList in order to experiment with side inputs. I was able
> > to use single numbers (e.g.: the mean of a list) as a side input but,
> > for lists and dictionaries, I'm getting exceptions. For
> > beam.pvalue.AsSingleton, I get this:
> > https://www.pastiebin.com/5cb475355dedf
> >
> > For beam.pvalue.AsList, I get https://www.pastiebin.com/5cb475f0e7f96.
> >
> > This is the code I'm running: https://www.pastiebin.com/5cb4760eb9abe.
> >
> > Replace beam.pvalue.AsSingleton with beam.pvalue.AsList to get the
> > other error. I'm using Apache Beam python SDK version 2.11.0.
> >
>
>
> It seems the "." at the right of each link got concatenated with the link.
> You need to copy the 2nd and 3rd links to the address bar. Sorry about that.
>


Re: Merging different PCollections for writing if BigQuery

2020-02-11 Thread Heejong Lee
What do you mean by "PCollection of dicts, each having different key
values"? What's the type of the PCollections? I assume that you want to
merge two PCollections of KV such as
PCollection[("a", 1), ("b", 2), ("c", 3)] + PCollection[("a", 4), ("d", 5),
("e", 6)]. Is that correct?

On Tue, Feb 11, 2020 at 9:19 AM Douglas Martins 
wrote:

> Hi,
>
> I am developing a Pipeline thats reads from and writes to BigQuery. At a
> certain point, I have two or more PCollections of dicts, each having
> different key values. How can I create a single PCollection from those,
> that can be written to a BigQuery table? The Flatten transform doesn't work
> because each element of the PCol ends up having different keys. Thanks!
>


Re: Behavior of KafkaIO

2020-05-11 Thread Heejong Lee
If we assume that there's only one reader, all partitions are assigned to a
single KafkaConsumer. I think the order of reading each partition depends
on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
messages.

Reference:
assigning partitions:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
polling records:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
creating a record batch:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614

On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
wrote:

> The number of partitions assigned to a given split depends on the
> desiredNumSplits value provided by the runner.
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>
> (This is assuming that you are using Beam Kafka source not a native Flink
> override).
>
> Do you see the same behavior when you increase the number of workers of
> your Flink cluster ?
>
> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> In my pipeline, I am using KafkaIO to read and write. The source topic
>> has 4 partitions and pipeline parallelism is 1.
>>
>> I noticed from consumer lag metrics, it will consume from 1 partition
>> until all the messages from that partition is processed then it will
>> consume from another partition.
>>
>> Is this the expected behavior?
>>
>> Runner is Flink.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: PubsubIO writeProtos with proto3

2020-05-13 Thread Heejong Lee
What protobuf version did you use for generating the Message object? It
looks like ProtoCoder supports both 2 and 3.

https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L70

On Wed, May 13, 2020 at 6:43 AM Thinh Ha  wrote:

> Hi All,
>
> Quick question please, when using PubsubIO.writeProtos, the ProtoCoder is
> used:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L571
>
> Does this assume proto2 or proto3? I'm trying to debug a serialization
> issue with enums which might be due to us using proto3 not proto2.
>
> Thank you,
> --
>
> Thinh Ha
>
> thin...@google.com
>
> Strategic Cloud Engineer
>
> +44 (0)20 3820 8009 <+44%2020%203820%208009>
>
>


Re: beam python on spark-runner

2020-05-14 Thread Heejong Lee
How did you start spark job server and what version of Apache Beam SDK did
you use?

There were some protocol changes recently so if both versions are not
matched you may see gRPC errors. If you used the gradle command on the
latest head for starting spark job server, I would recommend checking out
the same version of the source with the SDK version you installed and
trying again.

On Wed, May 13, 2020 at 2:51 PM Naveen M  wrote:

> Hi,
>
> I am trying to run sample WordCount beam job with PortableRunner by
> following the documentation here,
>
> https://beam.apache.org/documentation/runners/spark/
>
> I want to run this as a spark-submit command with YARN resource manager.
>
> Can you please let me know what is missing here? Thanks your help.
>
>
> I tried the below commands and giving some weird errors,
>
>
>
>
> spark-submit --master yarn --deploy-mode client --driver-memory 2g
> --executor-memory 1g --executor-cores 1 WordCount.py --input ""
> --output "" --runner PortableRunner --job_endpoint localhost:8099
>
>
>
>
>
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 503, in __exit__
>
> self.run().wait_until_finish()
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 483, in run
>
> self._options).run(False)
>
>   File "/usr/local/lib64/python3.7/site-packages/apache_beam/pipeline.py",
> line 496, in run
>
>return self.runner.run_pipeline(self, self._options)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 384, in run_pipeline
>
> job_service_plan.submit(proto_pipeline)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 105, in submit
>
> prepare_response.staging_session_token)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 190, in stage
>
> staging_location='')
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/stager.py",
> line 229, in stage_job_resources
>
> self.stage_artifact(pickled_session_file, staged_path)
>
>   File
> "/usr/local/lib64/python3.7/site-packages/apache_beam/runners/portability/portable_stager.py",
> line 98, in stage_artifact
>
> self._artifact_staging_stub.PutArtifact(artifact_request_generator())
>
>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
> 1011, in __call__
>
> return _end_unary_response_blocking(state, call, False, None)
>
>   File "/usr/local/lib64/python3.7/site-packages/grpc/_channel.py", line
> 729, in _end_unary_response_blocking
>
> raise _InactiveRpcError(state)
>
> grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated
> with:
>
> status = StatusCode.UNIMPLEMENTED
>
> details = "Method not found:
> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact"
>
> debug_error_string =
> "{"created":"@1589406258.175447016","description":"Error received from peer
> ipv6:[::1]:8098","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Method
> not found:
> org.apache.beam.model.job_management.v1.ArtifactStagingService/PutArtifact","grpc_status":12}"
>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-08 Thread Heejong Lee
DirectRunner is not well-tested for xlang transforms and you need to
specify jar_packages experimental flag for Java dependencies from Python
SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines.

On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath 
wrote:

> To clarify, Kafka dependency was already available as an embedded
> dependency in Java SDK Harness but not sure if this worked for
> DirectRunner. starting 2.22 we'll be staging dependencies from the
> environment during pipeline submission.
>
> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath 
> wrote:
>
>> Seems like Java dependency is not being properly set up when running the
>> cross-language Kafka step. I don't think this was available for Beam 2.21.
>> Can you try with the latest Beam HEAD or Beam 2.22 when it's released ?
>> +Heejong Lee 
>>
>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk 
>> wrote:
>>
>>> Pasting the error inline:
>>>
>>> ERROR:root:severity: ERROR
>>> timestamp {
>>>   seconds: 1591405163
>>>   nanos: 81500
>>> }
>>> message: "Client failed to dequeue and process the value"
>>> trace: "org.apache.beam.sdk.util.UserCodeException:
>>> java.lang.NoClassDefFoundError:
>>> org/springframework/expression/EvaluationContext\n\tat
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>> org.apache.beam