Accumulator with Map field in CombineFn not serializing correctly

2020-08-06 Thread Josh
Hi all,

In my Beam job I have defined my own CombineFn with an accumulator. Running
locally is no problem, but when I run the job on Dataflow I hit an Avro
serialization exception:
java.lang.NoSuchMethodException: java.util.Map.()
java.lang.Class.getConstructor0(Class.java:3082)
java.lang.Class.getDeclaredConstructor(Class.java:2178)
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

I am using the `@DefaultCoder(AvroCoder.class)` annotation for my
accumulator class. Is there anything special I need to do because one of
the fields in my accumulator class is a Map? I have pasted an outline of my
CombineFn below.

Thanks for any help with this!

Josh

private static class MyCombineFn extends CombineFn {

private static class ExpiringLinkedHashMap extends
LinkedHashMap {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return this.size() > 10;
}
}

@DefaultCoder(AvroCoder.class)
private static class PartialEventUpdate implements Serializable {
Long incrementCountBy = 0L;
Map recentEvents = new ExpiringLinkedHashMap<>();
Long lastSeenMillis = 0L;

PartialEventUpdate() {}
}

@DefaultCoder(AvroCoder.class)
private static class Accum implements Serializable {
Map eventIdToUpdate = new HashMap<>();

Accum() {}
}

@Override
public MyCombineFn.Accum createAccumulator() {
return new MyCombineFn.Accum();
}

...

}


Re: Scio 0.9.3 released

2020-08-06 Thread Marcin Kuthan
Just upgraded to Scio 0.9.2 / Beam 2.22 and the stack is not up-to-date
again ;)

Thanks for the update (Scio and Beam teams)!

On Thu, 6 Aug 2020 at 02:09, Robert Bradshaw  wrote:

> Thanks for the update!
>
> On Wed, Aug 5, 2020 at 11:46 AM Neville Li  wrote:
> >
> > Hi all,
> >
> > We just released Scio 0.9.3. This bumps Beam SDK to 2.23.0 and includes
> a lot of improvements & bug fixes.
> >
> > Cheers,
> > Neville
> >
> > https://github.com/spotify/scio/releases/tag/v0.9.3
> >
> > "Petrificus Totalus"
> >
> > There are no breaking changes in this release, but some were introduced
> with v0.9.0:
> >
> > See v0.9.0 Migration Guide for detailed instructions.
> >
> > Improvements
> >
> > Allow user-supplied filename prefix for smb writes/reads (#3215)
> > Refactor SortedBucketTransform into a BoundedSource + reuse merge logic
> (#3097)
> > Add keyGroupFilter optimization to scio-smb (#3160)
> > Add error message to BaseAsyncLookupDoFn preconditions check (#3176)
> > Add Elasticsearch 5,6,7 add/update alias on multiple indices ops (#3134)
> > Add initial update alias op to ES7(#2920)
> > Add ScioContext#applyTransform (#3146)
> > Allow SCollection#transform name override (#3142)
> > Allow setting default name through SCollection#applyTransform (#3144)
> > Update 0.9 migration doc and add Bigquery Type read schema
> documentation(#3148)
> >
> > Bug Fixes
> >
> > AvroBucketMetadata should validate keyPath (fix #3038) (#3140)
> > Allow union types in non leaf field for key (#3187)
> > Fix issue with union type as non-leaf field of smb key (#3193)
> > Fix ContextAndArgs#typed overloading issue (#3199)
> > Fix error propagation on Scala Future onSuccess callback (#3178)
> > Fix ByteBuffer should be readOnly (#3220)
> > Fix compiler warnings (#3183)
> > Fix JdbcShardedReadOptions.fetchSize description (#3209)
> > Fix FAQ typo (#3194)
> > Fix scalafix error in SortMergeBucketScioContextSyntax (#3158)
> > Add scalafix ExplicitReturnType and ProcedureSyntax rules (#3179)
> > Cleanup a few more unused and unchecked params (#3223)
> > Use GcpOptions#getWorkerZone instead of deprecated GcpOptions#getZone
> (#3224)
> > Use raw coder in SCollection#applyKvTransform (#3171)
> > Add raw beam coder materializer (#3164)
> > Avoid circular dep between SCollection and PCollectionWrapper (#3163)
> > Remove unused param of internal partitionFn (#3166)
> > Remove unused CoderRegistry (#3165)
> > Remove defunct scio-bench (#3150)
> > Reuse applyTransform (#3162)
> > Make multijoin.py python3
> > Use TextIO#withCompression (#3145)
> >
> > Dependency Updates
> >
> > Update Beam SDK to 2.23.0 (#3197)
> > Update dependencies to be inline with 2.23.0 (#3225)
> > Update to scala 2.12.12 (#3157)
> > Update auto-value to 1.7.4 (#3147)
> > Update breeze to 1.1 (#3211)
> > Update cassandra-all to 3.11.7 (#3186)
> > Update cassandra-driver-core to 3.10.0 (#3195)
> > Update commons-lang3 to 3.11 (#3161)
> > Update commons-text to 1.9 (#3185)
> > Update contributing guidelines with current tools (#3149)
> > Update elasticsearch-rest-client, ... to 7.8.1 (#3192)
> > Update elasticsearch, ... to 6.8.11 (#3188)
> > Update jackson-module-scala to 2.10.5 (#3169)
> > Update jna to 5.6.0 (#3156)
> > Update magnolify to 0.2.2 (#3154)
> > Update mysql-connector-java to 8.0.21 (#3153)
> > Update pprint to 0.6.0 (#3203)
> > Update protobuf version to 3.11.4 (#3200)
> > Update sbt-scalafix to 0.9.18 (#3138)
> > Update sbt-sonatype to 3.9.4 (#3136)
> > Update scalafmt-core to 2.6.2 (#3139)
> > Update scalafmt-core to 2.6.3 (#3152)
> > Update scalafmt-core to 2.6.4 (#3167)
> > Update sparkey to 3.1.0 (#3204)
> > Fix conflicting gcsio dependency (#3180)
>


Session Windowing followed by CoGroupByKey

2020-08-06 Thread Mohil Khare
Hello All,

I need to seek advice whether Session Windowing followed by CoGroupByKey is
a correct way to solve my use case or not and if YES, then whether I am
using it correctly or not.
Please note that I am using java sdk 2.19 on google dataflow

I have two streams of data coming from two different kafka topics and I
need to correlate them using the common key present in both of them. I
expect all the logs for a key to arrive within 90 seconds in both topics
and hence I decided to use session window

1. Read data from kafka topic like following:

PCollection> collection1 =
p

.apply("Read_From_Kafka", KafkaIO.read()

.withBootstrapServers(servers)

.withTopics(Arrays.asList(“topic1”))

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class)

.withConsumerConfigUpdates(kafkaConsumerProperties)

.withConsumerFactoryFn(consumerFactoryObj)

.commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

.withAllowedLateness(Duration.standardSeconds(360))

.discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection",

ParDo.of(new ParseLogsPOJO1(;


PCollection> collection2 =
p

.apply("Read_From_Kafka", KafkaIO.read()

.withBootstrapServers(servers)

.withTopics(Arrays.asList(“topic2”))

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ByteArrayDeserializer.class)

.withConsumerConfigUpdates(kafkaConsumerProperties)

.withConsumerFactoryFn(consumerFactoryObj)

.commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

.withAllowedLateness(Duration.standardSeconds(360))

.discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection",

ParDo.of(new ParseLogsPOJO2(;


2. Put each of the above collections in a session window with gap period 90
secs


   PCollection> sessionWindowedPOJO1 =

Collection1

.apply("Applying_Session_Window",

Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

.withAllowedLateness(Duration.ZERO).discardingFiredPanes());


 PCollection> sessionWindowedPOJO2 =

Collection1

.apply("Applying_Session_Window",

Window.>into(Sessions.withGapDuration(Duration.standardSeconds(90)))


.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))

.withAllowedLateness(Duration.ZERO).discardingFiredPanes());


3. CoGroupByKey and get correlated logs


   PCollection coGbkLogs =

KeyedPCollectionTuple.of(“tag1”, sessionWindowedPOJO1)

.and(“tag2”, sessionWindowedPOJO2)

.apply("CoGroupByMyKey”, CoGroupByKey.create())

.apply("Correlate_Logs_PerLogID", ParDo.of(new Correlate())




   Is this a correct way to solve my use case?


Looking forward to hearing from someone soon.


Thanks and Regards

Mohil


Tampermonkey script for GCP Dataflow console with enhanced view for finding job bottlenecks

2020-08-06 Thread Marcin Kuthan
It could be useful for finding Dataflow job bottlenecks, at least for the
first quick look at the console. Be aware of dragons behind the coloured
steps ;)

https://github.com/mkuthan/gcp-dataflow-tampermonkey

Many thanks to Tomek Nurkiewicz, the founder and author of the initial
version of the script!

M.