Re: Spring with Apache Beam

2019-10-14 Thread Luke Cwik
+user  to see if anyone else can provide guidance.

On Fri, Oct 11, 2019 at 5:31 PM Jitendra kumavat 
wrote:

> I have added the plugin and checked my, jar it contains the my service 
> registered. Still unable to run my JvmInitializer class.
>
> 
>   org.apache.maven.plugins
>   maven-shade-plugin
>   3.2.1
>   
> 
>   
> shade
>   
>   
> 
>implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
> 
>   
> 
>   
> 
>
>
> On Fri, Oct 11, 2019 at 2:56 PM Luke Cwik  wrote:
>
>> It is a common issue for users build processes to use the Maven shade
>> plugin or jarjar and to lose the META-INF/services files.
>> Can you verify that the jar's you submitted to Dataflow contain the
>> META-INF/services files?
>>
>> See this stackoverflow question
>> https://stackoverflow.com/questions/44365545/apache-beam-unable-to-find-registrar-for-gs
>>  for
>> a similar problem for more details.
>>
>>
>> On Fri, Oct 11, 2019 at 2:38 PM Jitendra kumavat 
>> wrote:
>>
>>> Hi Luke,
>>>
>>> I added @Autoservice for my custom intializer/processor and i can see
>>> the entries in META-INF/Services folder in my build class. Please see the
>>> attached screenshot. Still while running the dataflow job on google cloud i
>>> can not see the my System.out statements in logs. It is not picking up
>>> the GcmJvmInitializer, i believe so.
>>>
>>>
>>> Steps which i follow to run the job:
>>>
>>> 1. mvn clean install
>>> 2. create dataflow template in gcs bucket using mvn compile command.
>>> 3. run the job using  gcloud dataflow jobs 
>>>
>>> Can you please tell me what's wrong i am doing here. Thanks.
>>>
>>> Below is my classes structure:
>>>
>>> @AutoService(Processor.class)
>>> @SupportedAnnotationTypes(
>>> "org.springframework.context.annotation.*")
>>> @SupportedSourceVersion(SourceVersion.RELEASE_8)
>>> public class MyProcessor extends AbstractProcessor {
>>>   @Override
>>>   public boolean process(Set annotations, 
>>> RoundEnvironment roundEnv) {
>>> return false;
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> @AutoService(GcmJvmInitializer.class)
>>> public class GcmJvmInitializer implements JvmInitializer {
>>>
>>>   @Override
>>>   public void onStartup() {
>>> System.out.println("Starting Custom GcmJvmInitializer");
>>> ApplicationContext applicationContext = new 
>>> AnnotationConfigApplicationContext(
>>> AppConfig.class);
>>> for (String beanName : applicationContext.getBeanDefinitionNames()) {
>>>   System.out.println(beanName);
>>> }
>>> System.out.println("Stopping Custom GcmJvmInitializer");
>>>   }
>>>
>>>   @Override
>>>   public void beforeProcessing(PipelineOptions options) {
>>> System.out.println("Starting Custom GcmJvmInitializer");
>>> ApplicationContext applicationContext = new 
>>> AnnotationConfigApplicationContext(
>>> AppConfig.class);
>>> for (String beanName : applicationContext.getBeanDefinitionNames()) {
>>>   System.out.println(beanName);
>>> }
>>> System.out.println("Stopping Custom GcmJvmInitializer");
>>>   }
>>>
>>> }
>>>
>>>
>>> Thanks,
>>>
>>> Jitendra
>>>
>>>
>>> On Thu, Oct 10, 2019 at 6:35 PM Luke Cwik  wrote:
>>>
 You shouldn't need to call it before running the pipeline as you are
 doing (you can if you want but its not necessary).

 Have you created a service META-INF entry for the JvmInitializer you
 have created or are using @AutoService?
 This is the relevant bit of the documentation[1]. Here is some good
 docs for how to use @AutoService[2].

 1:
 https://github.com/apache/beam/blob/f3ce8669b50837d48ab0d0ee9a1298ce3b5bc61c/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java#L30
 2: https://github.com/google/auto/tree/master/service


 On Thu, Oct 10, 2019 at 5:29 PM Jitendra kumavat <
 jkumavat1...@gmail.com> wrote:

> Hi Luke,
>
> I tried the JvmIntializer.beforeProccssing method to initialize the
> spring application context. But it seems not working.
>
>
> * Below is my class definitions. *
>
> JvmInitializer class with context initialization.
>
> public class GcmJvmInitializer implements JvmInitializer {
>   @Override
>   public void beforeProcessing(PipelineOptions options){
> System.out.println("Starting Custom GcmJvmInitializer");
> ApplicationContext applicationContext = new 
> AnnotationConfigApplicationContext(
> AppConfig.class);
> for (String beanName : applicationContext.getBeanDefinitionNames()) {
>   System.out.println(beanName);
> }
> System.out.println("Stopping Custom GcmJvmInitializer");
>   }
> }
>
>
> *Appconfig*
>
> @Configuration
> @PropertySource("classpath:gcm.properties")
> @ComponentScan(basePackages = {"com.liveramp.intl.gcm"})
> public class AppConfig {
>
>   // B

Python vs Java SDK Performance

2019-10-14 Thread Shannon Duncan
Has anyone done any testing around the performance difference of Python SDK
vs Java SDK on Google Dataflow?

We recently dropped our requirement for sequence files in our pipeline
which opens the door to using the python SDK vs the Java SDK. But my
concern is loss of performance.

In Java we control our serialization very carefully between pipeline items
and my fear is loosing control of that in Python, so I'm curious about the
speed of serialization of generic python items like dictionaries, lists,
tuples, etc in context of dataflow.

Thanks!
Shannon Duncan


Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-14 Thread Tim Sell
You're getting 1 shard per pane, and you get a pane every time it's
triggered on an early firing. And then another one in the final on-time
pane. To have 1 file with 1 shard for every 15 minute window you need to
only fire on window close. Ie AfterWatermark.pastendofwindow, without early
firing.

On Mon, 14 Oct 2019, 14:35 Eddy G,  wrote:

> Thanks a lot everyone for your so valuable feedback!
>
> Just updated my code, made some minor refactoring and seems to be working
> like a charm. Still some data being dropped due to lateness (but I'm
> talking about 100 elements per 2 million, so no "big deal" there, I will
> take a look into extending lateness and overall performance bits that I'm
> missing out).
>
> A thing that worries me a lot is that the wall time has been exponentially
> increasing up to 1 day and 3 hours in the stage that is in charge of
> writing all that captured data into parquet files, supposedly due to
> .parquet file writing code.
>
> I suppose that this is also the reason why I still get tons of small
> parquet files within a same bucket, as I should only have, in a perfect
> scenario, 4 files (1 each 15 minutes due to the Window object length), when
> I'm currently having +60!
>
> .apply("Write .parquet File(s)",
> FileIO
> .writeDynamic()
> .by((SerializableFunction)
> event -> {
> // specify partitioning here
> })
> .via(ParquetIO.sink(AVRO_SCHEMA))
> .to(options.getOutputDirectory())
> .withNaming(type -> ParquetFileNaming.getNaming(...))
> .withDestinationCoder(StringUtf8Coder.of())
> .withNumShards(1) // should this be 0? Could this
> imply increasing of costs if set to 0?
>