Accumulator with Map field in CombineFn not serializing correctly
Hi all, In my Beam job I have defined my own CombineFn with an accumulator. Running locally is no problem, but when I run the job on Dataflow I hit an Avro serialization exception: java.lang.NoSuchMethodException: java.util.Map.() java.lang.Class.getConstructor0(Class.java:3082) java.lang.Class.getDeclaredConstructor(Class.java:2178) org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) I am using the `@DefaultCoder(AvroCoder.class)` annotation for my accumulator class. Is there anything special I need to do because one of the fields in my accumulator class is a Map? I have pasted an outline of my CombineFn below. Thanks for any help with this! Josh private static class MyCombineFn extends CombineFn { private static class ExpiringLinkedHashMap extends LinkedHashMap { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return this.size() > 10; } } @DefaultCoder(AvroCoder.class) private static class PartialEventUpdate implements Serializable { Long incrementCountBy = 0L; Map recentEvents = new ExpiringLinkedHashMap<>(); Long lastSeenMillis = 0L; PartialEventUpdate() {} } @DefaultCoder(AvroCoder.class) private static class Accum implements Serializable { Map eventIdToUpdate = new HashMap<>(); Accum() {} } @Override public MyCombineFn.Accum createAccumulator() { return new MyCombineFn.Accum(); } ... }
Re: Scio 0.9.3 released
Just upgraded to Scio 0.9.2 / Beam 2.22 and the stack is not up-to-date again ;) Thanks for the update (Scio and Beam teams)! On Thu, 6 Aug 2020 at 02:09, Robert Bradshaw wrote: > Thanks for the update! > > On Wed, Aug 5, 2020 at 11:46 AM Neville Li wrote: > > > > Hi all, > > > > We just released Scio 0.9.3. This bumps Beam SDK to 2.23.0 and includes > a lot of improvements & bug fixes. > > > > Cheers, > > Neville > > > > https://github.com/spotify/scio/releases/tag/v0.9.3 > > > > "Petrificus Totalus" > > > > There are no breaking changes in this release, but some were introduced > with v0.9.0: > > > > See v0.9.0 Migration Guide for detailed instructions. > > > > Improvements > > > > Allow user-supplied filename prefix for smb writes/reads (#3215) > > Refactor SortedBucketTransform into a BoundedSource + reuse merge logic > (#3097) > > Add keyGroupFilter optimization to scio-smb (#3160) > > Add error message to BaseAsyncLookupDoFn preconditions check (#3176) > > Add Elasticsearch 5,6,7 add/update alias on multiple indices ops (#3134) > > Add initial update alias op to ES7(#2920) > > Add ScioContext#applyTransform (#3146) > > Allow SCollection#transform name override (#3142) > > Allow setting default name through SCollection#applyTransform (#3144) > > Update 0.9 migration doc and add Bigquery Type read schema > documentation(#3148) > > > > Bug Fixes > > > > AvroBucketMetadata should validate keyPath (fix #3038) (#3140) > > Allow union types in non leaf field for key (#3187) > > Fix issue with union type as non-leaf field of smb key (#3193) > > Fix ContextAndArgs#typed overloading issue (#3199) > > Fix error propagation on Scala Future onSuccess callback (#3178) > > Fix ByteBuffer should be readOnly (#3220) > > Fix compiler warnings (#3183) > > Fix JdbcShardedReadOptions.fetchSize description (#3209) > > Fix FAQ typo (#3194) > > Fix scalafix error in SortMergeBucketScioContextSyntax (#3158) > > Add scalafix ExplicitReturnType and ProcedureSyntax rules (#3179) > > Cleanup a few more unused and unchecked params (#3223) > > Use GcpOptions#getWorkerZone instead of deprecated GcpOptions#getZone > (#3224) > > Use raw coder in SCollection#applyKvTransform (#3171) > > Add raw beam coder materializer (#3164) > > Avoid circular dep between SCollection and PCollectionWrapper (#3163) > > Remove unused param of internal partitionFn (#3166) > > Remove unused CoderRegistry (#3165) > > Remove defunct scio-bench (#3150) > > Reuse applyTransform (#3162) > > Make multijoin.py python3 > > Use TextIO#withCompression (#3145) > > > > Dependency Updates > > > > Update Beam SDK to 2.23.0 (#3197) > > Update dependencies to be inline with 2.23.0 (#3225) > > Update to scala 2.12.12 (#3157) > > Update auto-value to 1.7.4 (#3147) > > Update breeze to 1.1 (#3211) > > Update cassandra-all to 3.11.7 (#3186) > > Update cassandra-driver-core to 3.10.0 (#3195) > > Update commons-lang3 to 3.11 (#3161) > > Update commons-text to 1.9 (#3185) > > Update contributing guidelines with current tools (#3149) > > Update elasticsearch-rest-client, ... to 7.8.1 (#3192) > > Update elasticsearch, ... to 6.8.11 (#3188) > > Update jackson-module-scala to 2.10.5 (#3169) > > Update jna to 5.6.0 (#3156) > > Update magnolify to 0.2.2 (#3154) > > Update mysql-connector-java to 8.0.21 (#3153) > > Update pprint to 0.6.0 (#3203) > > Update protobuf version to 3.11.4 (#3200) > > Update sbt-scalafix to 0.9.18 (#3138) > > Update sbt-sonatype to 3.9.4 (#3136) > > Update scalafmt-core to 2.6.2 (#3139) > > Update scalafmt-core to 2.6.3 (#3152) > > Update scalafmt-core to 2.6.4 (#3167) > > Update sparkey to 3.1.0 (#3204) > > Fix conflicting gcsio dependency (#3180) >
Session Windowing followed by CoGroupByKey
Hello All, I need to seek advice whether Session Windowing followed by CoGroupByKey is a correct way to solve my use case or not and if YES, then whether I am using it correctly or not. Please note that I am using java sdk 2.19 on google dataflow I have two streams of data coming from two different kafka topics and I need to correlate them using the common key present in both of them. I expect all the logs for a key to arrive within 90 seconds in both topics and hence I decided to use session window 1. Read data from kafka topic like following: PCollection> collection1 = p .apply("Read_From_Kafka", KafkaIO.read() .withBootstrapServers(servers) .withTopics(Arrays.asList(“topic1”)) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.standardSeconds(360)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection", ParDo.of(new ParseLogsPOJO1(; PCollection> collection2 = p .apply("Read_From_Kafka", KafkaIO.read() .withBootstrapServers(servers) .withTopics(Arrays.asList(“topic2”)) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.standardSeconds(360)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection", ParDo.of(new ParseLogsPOJO2(; 2. Put each of the above collections in a session window with gap period 90 secs PCollection> sessionWindowedPOJO1 = Collection1 .apply("Applying_Session_Window", Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); PCollection> sessionWindowedPOJO2 = Collection1 .apply("Applying_Session_Window", Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withAllowedLateness(Duration.ZERO).discardingFiredPanes()); 3. CoGroupByKey and get correlated logs PCollection coGbkLogs = KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1) .and(“tag2”, sessionWindowedPOJO2) .apply("CoGroupByMyKey”, CoGroupByKey.create()) .apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate()) Is this a correct way to solve my use case? Looking forward to hearing from someone soon. Thanks and Regards Mohil
Tampermonkey script for GCP Dataflow console with enhanced view for finding job bottlenecks
It could be useful for finding Dataflow job bottlenecks, at least for the first quick look at the console. Be aware of dragons behind the coloured steps ;) https://github.com/mkuthan/gcp-dataflow-tampermonkey Many thanks to Tomek Nurkiewicz, the founder and author of the initial version of the script! M.