Re: Stateful & Timely Call

2020-04-27 Thread Reza Rokni
Great idea!

On Fri, 24 Apr 2020, 22:33 Ismaël Mejía,  wrote:

> Sounds like a good addition to the Beam patterns page Reza :)
>
> On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma 
> wrote:
> >
> > Thanks Robert,
> >
> > This is a life saver and its a great help :). It works like a charm.
> >
> > Thanks
> > Aniruddh
> >
> > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw 
> wrote:
> >>
> >> I may have misinterpreted your email, I thought you didn't have a need
> for keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List as output. That
> is, it's a DoFn>.
> >>
> >> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
> >>
> >> class MyBufferingDoFn> {
> >>   List buffer = new ArrayList<>();
> >>   @ProcessElement public void processElement(T elt,
> OutputReceiver> out) {
> >> buffer.append(out);
> >> if (buffer.size() > 100) {
> >>   out.output(buffer);
> >>   buffer = new ArrayList<>();
> >> }
> >>   }
> >>   @FinishBundle public void finishBundle(OutputReceiver> out) {
> >> out.output(buffer);
> >> buffer = new ArrayList<>();
> >>   }
> >> }
> >>
> >> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
> for more information on the lifetime of DoFns.
> >>
> >> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
> >>
> >> Read -> DoFnA -> GBK -> DoFnB -> Write
> >>
> >> then Read, DoFnA, and GBK[part1] will execute concurrently (all
> starting up almost immediately), one element at at time, and when that's
> finished, GBK[part2, DoFnB, Write will execute concurrently, one element at
> a time, so you can't just look at the last unfinished stage to determine
> where the bottleneck is. (One helpful tool, however, is looking at the
> amount of time spent on each step in the UI.)
> >>
> >> Hopefully that helps.
> >>
> >> - Robert
> >>
> >>
> >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
> wrote:
> >>>
> >>> Thanks Robert and Luke
> >>>
> >>> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
> >>>
> >>> Currently I used GroupByKey.Create()
> >>>
> >>> What's recommended way to use what key to make it execute faster like
> same key for all rows, vs different key for each row vs same row for a
> group of keys.
> >>>
> >>> Thanks
> >>> Aniruddh
> >>>
> >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
> 
>  As Robert suggested, what prevents you from doing:
>  ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>  where BatchInMemory stores elements in the @ProcessElement method in
> an in memory list and produce output every time the list is large enough
> with a final output in the @FinishBundle method?
> 
>  On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
> wrote:
> >
> > Hi Luke
> >
> > Sorry forgot to mention the functions. Dataflow adds following
> function and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is
> super slow, How to choose keys to make it faster ?
> >
> >  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
> >   .setCoder(
> >   KvCoder.of(
> >   keyCoder,
> >   KvCoder.of(InstantCoder.of(),
> WindowedValue.getFullCoder(kvCoder, windowCoder
> >
> >   // Group by key and sort by timestamp, dropping windows as
> they are reified
> >   .apply("PartitionKeys", new
> GroupByKeyAndSortValuesOnly<>())
> >
> >   // The GBKO sets the windowing strategy to the global
> default
> >   .setWindowingStrategyInternal(inputWindowingStrategy);
> >
> > THanks
> > ANiruddh
> >
> > On 2020/04/23 16:35:58, Aniruddh Sharma 
> wrote:
> > > Thanks Luke for your response.
> > >
> > > My use case is following.
> > > a) I read data from BQ (TableRow)
> > > b) Convert it into (Table.Row) for DLP calls.
> > > c) have to batch Table.Row collection up to a max size of 512 KB
> (i.e fit may rows from BQ into a single DLP table) and call DLP.
> > >
> > > Functionally, I don't have a need of key and window. As I just
> want to fit rows in DLP table up to a max size.
> > >
> > > In batch mode, when I call StateFulAPI,
> > > it adds a
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and this
> step is super slow. Like it is running on 50 node cluster for 800 GB data
> for last 10 hours.
> > >
> > > This step is not added when I call

HCatalogIO - Trying to read table metadata (columns names and indexes)

2020-04-27 Thread Gershi, Noam
Hi
Using HCatalogIO as a source - I am trying to read column tables.

Code:

PCollection hcatRecords = input
.apply(HCatalogIO.read()
.withConfigProperties(configProperties)
.withDatabase("db-name")
.withTable("my-table-name"));
...
HCatalogBeamSchema hcatSchema = 
HCatalogBeamSchema.create(ImmutableMap.of("table", "my-table-name"));
Schema schema = hcatSchema.getTableSchema("db-name", "my-table-name").get();
List fields = schema.getFields();


I get:

20/04/27 09:12:16 INFO LineBufferedStream: Caused by: 
java.lang.UnsupportedOperationException: The type 'decimal(30,16)' of field 
'amount' is not supported.
20/04/27 09:12:16 INFO LineBufferedStream:  at 
org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamField(SchemaUtils.java:60)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema(SchemaUtils.java:53)
20/04/27 09:12:16 INFO LineBufferedStream:  at 
org.apache.beam.sdk.io.hcatalog.HCatalogBeamSchema.getTableSchema(HCatalogBeamSchema.java:83)

Thanx in advance,
Noam



Re: Kafka IO: value of expansion_service

2020-04-27 Thread Kyle Weaver
I'm not sure about the org.springframework.expression.EvaluationContext
issue, but "local class incompatible" usually happens when using Beam
components built from different sources. Make sure to rebuild everything
from the same commit.

On Sat, Apr 25, 2020 at 10:07 AM Piotr Filipiuk 
wrote:

> After syncing to:
>
> commit 24361d1b5981ef7d18e586a8e5deaf683f4329f1 (HEAD -> master,
> origin/master, origin/HEAD)
> Author: Ning Kang 
> Date:   Fri Apr 24 10:58:07 2020 -0700
>
> The new error is:
>
> RuntimeError: java.lang.IllegalArgumentException: unable to deserialize
> Custom DoFn With Execution Info
> at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at
> org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:697)
> at
> org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:360)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.(FnApiDoFnRunner.java:356)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:165)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.createRunnerForPTransform(FnApiDoFnRunner.java:141)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:233)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:196)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:474)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:271)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:534)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:266)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.InvalidClassException:
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn; local class
> incompatible: stream classdesc serialVersionUID = 7311199418509482705,
> local class serialVersionUID = 5488866827627794770
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
> ... 18 more
>
> I am not sure it is related to
> https://issues.apache.org/jira/browse/BEAM-9745.
>
> On Wed, Apr 22, 2020 at 2:48 PM Piotr Filipiuk 
> wrote:
>
>> Here is an error I am getting when using DirectRunner:
>>
>> DEBUG:apache_beam.runners.portability.fn_api_runner.fn_runner:Wait for
>> the bundle bundle_1 to finish.
>> 150f165c51d9ffbd902b6e80f691d095eb233812bb780625a95ab96a1134d951
>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>> Requests sent by runner: [('bundle_1', 1)]
>> DEBUG:apache_beam.runners.portability.fn_api_runner.worker_handlers:Runner:
>> Requests multiplexing info: []
>> Traceback (most recent call last):
>>   File
>> "/Users/piotr.filipiuk/.pyenv/versions/3.6.5/lib/python3.6/runpy.py", line
>> 193, in _run_module_as_main
>> "__main__", mod_spe