Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-09-30 Thread Kobe Feng
Hi everyone,
Is there any recommended way to upload a third party jar (runtime scope)
for expanding transform like KafkaIO.Read when using the python portable
runner? Thank you!

I tried --experiments=jar_packages=abc.jar,d.jar but just found those
artifacts in python harness with provision info, and the java harness just
uses the default environment for dependencies after expanding
transformation from the grpc server upon expansion jar for reading Kafka
messages.

Also noticed above option will be removed in the future then tried
--files_to_stage but this option only exists in Java SDK pipeline options.

-- 
Yours Sincerely
Kobe Feng


Re: Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-10-02 Thread Kobe Feng
Just a followup since no one replied it.
My understanding is for any expanded transforms beam wants the
environment self-described.
So I updated boot and dockerfile for the java harness environment and use
--sdk_harness_container_image_overrides in portable runner but fail to see
the updated image loaded (default still), I guess only dataflow runner
support it by glancing the code, but I believe it's the correct way and
just need to deep dive the codes here when I turn back, then I will update
this thread too.

Kobe
On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng  wrote:

> Hi everyone,
> Is there any recommended way to upload a third party jar (runtime scope)
> for expanding transform like KafkaIO.Read when using the python portable
> runner? Thank you!
>
> I tried --experiments=jar_packages=abc.jar,d.jar but just found those
> artifacts in python harness with provision info, and the java harness just
> uses the default environment for dependencies after expanding
> transformation from the grpc server upon expansion jar for reading Kafka
> messages.
>
> Also noticed above option will be removed in the future then tried
> --files_to_stage but this option only exists in Java SDK pipeline options.
>
> --
> Yours Sincerely
> Kobe Feng
>


-- 
Yours Sincerely
Kobe Feng


Re: Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-10-02 Thread Kobe Feng
Thanks Rober, yes, our Kafka requires JAAS configuration (sasl.jaas.config)
at the client side for security check with the corresponding LoginModule
which requires additional classes:
==
Caused by: javax.security.auth.login.LoginException: unable to find
LoginModule class: io.${}.kafka.security.iaf.IAFLoginModule
at
javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
at
javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at
javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at
javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at
javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at
javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at
org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
at
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76)
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
... 42 more

at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)

On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw  wrote:

> Could you clarify a bit exactly what you're trying to do? When using
> KafkaIO, the provided jar should have all the necessary dependencies to
> construct and execute the kafka read/write. Is there some reason you need
> to inject additional dependencies into the environment provided by kafka?
>
> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng  wrote:
>
>> Just a followup since no one replied it.
>> My understanding is for any expanded transforms beam wants the
>> environment self-described.
>> So I updated boot and dockerfile for the java harness environment and use
>> --sdk_harness_container_image_overrides in portable runner but fail to see
>> the updated image loaded (default still), I guess only dataflow runner
>> support it by glancing the code, but I believe it's the correct way and
>> just need to deep dive the codes here when I turn back, then I will update
>> this thread too.
>>
>> Kobe
>> On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng  wrote:
>>
>>> Hi everyone,
>>> Is there any recommended way to upload a third party jar (runtime scope)
>>> for expanding transform like KafkaIO.Read when using the python portable
>>> runner? Thank you!
>>>
>>> I tried --experiments=jar_packages=abc.jar,d.jar but just found those
>>> artifacts in python harness with provision info, and the java harness just
>>> uses the default environment for dependencies after expanding
>>> transformation from the grpc server upon expansion jar for reading Kafka
>>> messages.
>>>
>>> Also noticed above option will be removed in the future then tried
>>> --files_to_stage but this option only exists in Java SDK pipeline options.
>>>
>>> --
>>> Yours Sincerely
>>> Kobe Feng
>>>
>>
>>
>> --
>> Yours Sincerely
>> Kobe Feng
>>
>

-- 
Yours Sincerely
Kobe Feng


Re: Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-10-02 Thread Kobe Feng
Thanks Robert, yes, I'm also thinking of own expansion service and trying
it, so:

[grpc-default-executor-3] INFO
io.x.kafka.security.token.RefreshableTokenLoginModule - starting
renewal task and exposing its jmx metrics
[grpc-default-executor-3] INFO
io..kafka.security.token.TokenRenewalTask - IAF Token renewal started
[grpc-default-executor-3] INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
[grpc-default-executor-3] INFO
io..kafka.security.token.TokenRenewalTask - proposed next checkpoint
time Sat Oct 03 11:38:29 UTC 2020, now is Sat Oct 03 02:02:24 UTC 2020, min
expiration Sat Oct 03 14:02:30 UTC 2020

I much agree with your last statement!

Happy weekend!


On Fri, Oct 2, 2020 at 6:31 PM Robert Bradshaw  wrote:

> If you make sure that these extra jars are in your path when you
> execute your pipeline, they should get picked up when invoking the
> expansion service (though this may not be the case long term).
>
> The cleanest way would be to provide your own expansion service. If
> you build a jar that consists of Beam's IO expansion service plus any
> necessary dependencies, you should be able to do
>
> ReadFromKafka(
> [ordinary params],
> expansion_service=BeamJarExpansionService('path/to/your/jar'))
>
> to use this "custom" expansion service. See
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
> An alternative is to pass a pipeline option
>
> --beam_services={"sdks:java:io:expansion-service:shadowJar":
> "path/to/your/jar"}
>
> which will override the default. (You can pass "host:port" rather than
> a path as well if you manually start the expansion service.)
>
> Exactly how to specify at a top level a set of extra dependencies to
> be applied to a particular subset of other-language transforms is
> still an open problem. Alternatively we could try to make expansion
> services themselves trivially easy to build, customize, and use.
>
> Hopefully that helps.
>
> - Robert
>
>
>
>
> On Fri, Oct 2, 2020 at 5:57 PM Kobe Feng  wrote:
> >
> > Thanks Rober, yes, our Kafka requires JAAS configuration
> (sasl.jaas.config) at the client side for security check with the
> corresponding LoginModule which requires additional classes:
> >
> ==
> > Caused by: javax.security.auth.login.LoginException: unable to find
> LoginModule class: io.${}.kafka.security.iaf.IAFLoginModule
> > at
> javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
> > at
> javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
> > at
> javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
> > at
> javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at
> javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
> > at
> javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> > at
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
> > at
> org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
> > at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76)
> > at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
> > ... 42 more
> >
> > at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> > at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> >
> > On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw 
> wrote:
> >>
> >> Could you clarify a bit exactly what you're trying to do? When using
> KafkaIO, the provided jar should have all the necessary dependencies to
> construct and execute the kafka read/write. Is there some reason you need
> to inject additional dependencies into the environment provided by kafka?
> >>
> >> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng  wrote:
> >>>
> >>> Just a followup since no one replied it.
> >>> My understanding is for any expanded transforms beam wants the
> environment self-described.
> >>> So I updated boot and dockerfile for the java harness environment and
> use --sdk_har

Re: Quick question regarding production readiness of ParquetIO

2020-12-01 Thread Kobe Feng
Tao, my experience of using ParquetIO is good (version: 2.11, 2.18, 2.21)
We mainly leverage it for hadoop sink by converting avro record to parquet,
and we checked data loss, quality, etc. are good, and no performance issue.

Here is one code snippet: (why we have own parquetIO is to remove partition
field from the record base on user requirement as hive/spark partition
table already include the value in HDFS path and use it for scan filtering)

def toHadoop(basePath: String, recordPartition: RecordPartition,
fileNaming: FileNaming, shardNum: Int, includePartitionFields: Boolean
= false): Unit = {
  val baseDir = HadoopClient.resolve(basePath, env)
  pCollection.apply("darwin.write.hadoop.parquet." + postfix,
FileIO.writeDynamic[String, GenericRecord]()
.by(recordPartition.partitionFunc)
.withDestinationCoder(StringUtf8Coder.of())
.via(*DarwinParquetIO*.sink(recordPartition.getOutputSchema(avroSchema,
includePartitionFields), recordPartition.getPartitionFields(),
includePartitionFields))
.to(baseDir)
.withCompression(Compression.LZO)
.withNaming((partitionFolder: String) =>
relativeFileNaming(StaticValueProvider.of[String](baseDir +
Path.SEPARATOR + partitionFolder), fileNaming))
.withNumShards(shardNum))
}


On Tue, Dec 1, 2020 at 3:44 AM Alexey Romanenko 
wrote:

> ParquetIO exists in Beam since 2.5.0 release, so it can be considered
> quite stable and mature. I’m not aware about any open major issues and you
> can check the performance here [1][2]
>
> On the other hand, you are right  - it’s annotated with @Experimental as
> many other Beam Java IOs and components that make people confusing. There
> is a long story on this in Beam and we had several related discussions (the
> latest one [3]) on how to reduce the number of these "experimental”s.
>
> [1]
> http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow?panelId=16&fullscreen&orgId=1
> [2]
> http://metrics.beam.apache.org/d/bnlHKP3Wz/java-io-it-tests-dataflow?panelId=17&fullscreen&orgId=1
> [3]
> https://lists.apache.org/thread.html/0f769736be1cf2fc5227f7a25dd3fdbb9296afe8a071761cb91f588a%40%3Cdev.beam.apache.org%3E
>
> On 30 Nov 2020, at 22:13, Tao Li  wrote:
>
> Hi Beam community,
>
> According to this link the  ParquetIO is still considered experimental:
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.html
>
> Does it mean it’s not yet ready for prod usage? If that’s the case, when
> will it be ready?
>
> Also, is there any known performance/scalability/reliability issue with
> ParquetIO?
>
> Thanks a lot!
>
>
>

-- 
Yours Sincerely
Kobe Feng


Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Kobe Feng
Hi all,
Sorry for the step-in. This case reminds me the similar req. in my company
for plugin lambda func in beam's pipeline dynamically like filtering,
selecting, etc. without restarting the job long time ago, like flink
stateful functions, AKKA, etc.

Generally, SQL defines input, output, and transformation explicit which
means fix schema and coder usually (using * is arbitrary, nowadays SQL more
change to newSQL due to NoSQL and decouple with storage layer, loosing the
restrictions but for more flexible processing capability)

So if we want to support schema-free in streaming pipeline natively, could
we consider providing such capability from beam core part too (for higher
transparency and possibly be leveraged by SQL layer too), like the
capability for plugin coder with runtime compatible check with prev ones,
stateful functions (not beam's stateful processing), in-out data with
schema Id for schema-based transform, etc.

I'm kinder of being away from apache beam for a while, sorry if beam
already had such native support or I misunderstood.

Thanks!
Kobe Feng

On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax  wrote:

> Talat, are you interested in writing a proposal and sending it to
> d...@beam.apache.org? We could help advise on the options.
>
> Reuven
>
> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud 
> wrote:
>
>> We could support EXPECT statements in proposal 2 as long as we restricted
>> it to known fields.
>>
>> We are getting into implementation details now. Making unknown fields
>> just a normal column introduces a number of problems. ZetaSQL doesn't
>> support Map type. All our IOs would need to explicitly deal with that
>> special column. There would be a lack of consistency between the various
>> types (Avro, Proto, Json) which should all support this.
>>
>> We might also want something even more invasive: everything is an unknown
>> field unless it is referenced in the SQL query. All of these options are
>> possible. I guess we need someone who has time to work on it to write a
>> proposal.
>>
>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax  wrote:
>>
>>> I'm not sure that we could support EXCEPT statements, as that would
>>> require introspecting the unknown fields (what if the EXCEPT statement
>>> matches a field that later is added as an unknown field?). IMO this sort of
>>> behavior only makes sense on true pass-through queries. Anything that
>>> modifies the input record would be tricky to support.
>>>
>>> Nested rows would work for proposal 2. You would need to make sure that
>>> the unknown-fields map is recursively added to all nested rows, and you
>>> would do this when you infer a schema from the avro schema.
>>>
>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud 
>>> wrote:
>>>
>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>> statements, which returns all columns except specific ones. Adding an
>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>> needs to be something that is native to the Row type, so columns added to
>>>> nested rows also work.
>>>>
>>>> Andrew
>>>>
>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax  wrote:
>>>>
>>>>> There's a difference between a fully dynamic schema and simply being
>>>>> able to forward "unknown" fields to the output.
>>>>>
>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>
>>>>> However, if you have a SELECT * FROM WHERE  statement that does no
>>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>> messages,but this is an implementation artifact - in theory we could 
>>>>> output
>>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>>> schema, since you can't really do anything

Re: About Beam SQL Schema Changes and Code generation

2020-12-08 Thread Kobe Feng
Talat, my bad, first thing first, to resolve the issue, your proposal would
definitely help the start point for researching schema revolution in beam
pipeline, and I could comment there if any.

Andrew first reply is clear about the intention and scope for apache beam:
static graph for maximum optimization.
I just think both ways are more like compromise which could be done by the
app itself if it converts different formats.

Maybe in the future, we could optionally choose ultimate performance (fix
coder, static sql plan) or ultimate flexibility (schema revolution, plugin
lambda, dynamic routing, etc) or balance them when using SQL ^ ^


Re: Quick question regarding ParquetIO

2021-01-08 Thread Kobe Feng
https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.26.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FAvroIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377524619%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=bouNPiimOXG8wvarFt2huIJ6cd8k5c2ekp2Sd4WqPjc%3D&reserved=0>
>
>
>
> I have one more quick question regarding the “reading records of an
> unknown schema” scenario. In the sample code a PCollection is being
> returned and the parseGenericRecords requires a parsing logic. What if I
> just want to get a PCollection instead of a specific class
> (e.g. Foo in the example)? I guess I can just skip the
> ParquetIO.parseGenericRecords transform? So do I still have to specify the
> dummy parsing logic like below? Thanks!
>
>
>
> p.apply(AvroIO.parseGenericRecords(new SerializableFunction GenericRecord >() {
>
>public Foo apply(GenericRecord record) {
>
>  return record;
>
>}
>
>
>
> *From: *Alexey Romanenko 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Wednesday, January 6, 2021 at 10:13 AM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Quick question regarding ParquetIO
>
>
>
> Hi Tao,
>
>
>
> This jira [1] looks exactly what you are asking but it was merged recently
> (thanks to Anant Damle for working on this!) and it should be available
> only in Beam 2.28.0.
>
>
>
> [1] https://issues.apache.org/jira/browse/BEAM-11460
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11460&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OSgzxc3ZpzDO71H5qgDn8cgOUrgTk3pa8r9TbbnjCWk%3D&reserved=0>
>
>
>
> Regards,
>
> Alexey
>
>
>
>
>
> On 6 Jan 2021, at 18:57, Tao Li  wrote:
>
>
>
> Hi beam community,
>
>
>
> Quick question about ParquetIO
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377534575%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=U4Cma5wX8rv8ZY%2FeDRa3s%2FuSgZmgwg5iL%2FdQYs1OSpA%3D&reserved=0>.
> Is there a way to avoid specifying the avro schema when reading parquet
> files? The reason is that we may not know the parquet schema until we read
> the files. In comparison, spark parquet reader
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-parquet.html&data=04%7C01%7Ctaol%40zillow.com%7Cdab3777011ed4b6e0ec708d8b3d3c2b5%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637457069377544530%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=zAmSyGlNveyeI4aKA5GreuLxBKrwRDS0pM55CP6bzeY%3D&reserved=0>
>  does not require such a schema specification.
>
>
>
> Please advise. Thanks a lot!
>
>
>


-- 
Yours Sincerely
Kobe Feng


Re: Does writeDynamic() support writing different element groups to different output paths?

2021-03-03 Thread Kobe Feng
I used the following way long time ago for writing into partitions in hdfs
(maybe better solutions from others), and not sure any interface change
which you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix,
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) =>
relativeFileNaming(StaticValueProvider.of[String](baseDir +
Path.SEPARATOR + partitionFolder), fileNaming))
   ...

val partitionFunc: T => Stringthe good practice is auto-switch: using
event time field from record value for partitioning when event time
window, or process time.
and partitionFunc could consider multi partition columns to get
subdirectories base on ur file system path separator, e.g. S3.


On Wed, Mar 3, 2021 at 5:36 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I have a streaming app that writes every hour’s data to a folder named
> with this hour. With Flink (for example), we can leverage “Bucketing File
> Sink”:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html
>
>
>
> However I am not seeing Beam FileIO’s writeDynamic API supports specifying
> different output paths for different groups:
> https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html
>
>
>
> Seems like writeDynamic() only supports specifying different naming
> strategy.
>
>
>
> How can I specify different hourly based output paths for hourly data with
> Beam writeDynamic? Please advise. Thanks!
>
>
>
>
>


-- 
Yours Sincerely
Kobe Feng