reuvenlax commented on a change in pull request #13318:
URL: https://github.com/apache/beam/pull/13318#discussion_r525369407
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime)
{
@ProcessElement
public void processElement(
ProcessContext c,
Review comment:
Instead of ProcessContext just inject @Element and OutputReceiver
parameters
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -115,19 +109,23 @@ public Query3(NexmarkConfiguration configuration) {
"OR".equals(person.state)
|| "ID".equals(person.state)
|| "CA".equals(person.state)))
+ .apply(
Review comment:
Ditto - simpler to use WithKeys
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -83,27 +75,29 @@ public Query3(NexmarkConfiguration configuration) {
@Override
public PCollection<NameCityStateId> expand(PCollection<Event> events) {
- int numEventsInPane = 30;
-
- PCollection<Event> eventsWindowed =
- events.apply(
- Window.<Event>into(new GlobalWindows())
-
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(numEventsInPane)))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.ZERO));
- PCollection<KV<Long, Auction>> auctionsBySellerId =
- eventsWindowed
+ PCollection<KV<Long, Event>> auctionsBySellerId =
+ events
// Only want the new auction events.
.apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
// We only want auctions in category 10.
.apply(name + ".InCategory", Filter.by(auction -> auction.category
== 10))
// Key auctions by their seller id.
- .apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER);
-
- PCollection<KV<Long, Person>> personsById =
- eventsWindowed
+ .apply(
Review comment:
Simpler to do
.apply("EventByAuctionSeller",
WithKeys.of(Auction::seller).withKeyType(TypeDescriptors.longs()))
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime)
{
@ProcessElement
public void processElement(
ProcessContext c,
- @TimerId(PERSON_STATE_EXPIRING) Timer timer,
- @StateId(PERSON) ValueState<Person> personState,
- @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
+ @TimerId(STATE_EXPIRING) Timer timer,
+ @StateId(PERSON) @AlwaysFetched ValueState<Person> personState,
+ @StateId(AUCTIONS) BagState<Auction> auctionsState) {
// We would *almost* implement this by rewindowing into the global
window and
// running a combiner over the result. The combiner's accumulator would
be the
// state we use below. However, combiners cannot emit intermediate
results, thus
- // we need to wait for the pending ReduceFn API.
+ // we need to wait for the pending ReduceFn API
Person existingPerson = personState.read();
- if (existingPerson != null) {
- // We've already seen the new person event for this person id.
- // We can join with any new auctions on-the-fly without needing any
- // additional persistent state.
- for (Auction newAuction :
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
- newAuctionCounter.inc();
- newOldOutputCounter.inc();
- c.output(KV.of(newAuction, existingPerson));
- }
- return;
- }
-
- Person theNewPerson = null;
- for (Person newPerson :
c.element().getValue().getAll(NexmarkQueryUtil.PERSON_TAG)) {
- if (theNewPerson == null) {
- theNewPerson = newPerson;
+ if (c.element().getValue().newPerson != null) {
Review comment:
Comment - this is because Event is a union object, etc.
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
##########
@@ -53,6 +56,48 @@
public class Query5 extends NexmarkQueryTransform<AuctionCount> {
private final NexmarkConfiguration configuration;
+ public static class TopCombineFn
+ extends AccumulatingCombineFn<KV<Long, Long>, Accum, KV<Long,
List<Long>>> {
+ @Override
+ public Accum createAccumulator() {
+ return new Accum();
+ }
+
+ @DefaultCoder(AvroCoder.class)
Review comment:
AvroCoder tends to be very inefficient. Either write a custom coder or
use SchemaCoder. To use SchemaCoder return the following from
getAccumulatorCoder (unfortunately it appears that combiners have not been
integrated yet with the SchemaRegistry, as otherwise it would be much simpler -
just annotating with @DefaultSchema)
JavaFieldSchema provider = new JavaFieldSchema();
TypeDescriptor<Accum> typeDescriptor = new TypeDescriptor<Accum>{}();
return SchemaCoder.of(provider.schemaFor(typeDescriptor),
provider.toRowFunction(typeDescriptor),
provider.fromRowFunction(typeDescriptor));
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -197,85 +195,60 @@ private JoinDoFn(String name, int maxAuctionsWaitingTime)
{
@ProcessElement
public void processElement(
ProcessContext c,
- @TimerId(PERSON_STATE_EXPIRING) Timer timer,
- @StateId(PERSON) ValueState<Person> personState,
- @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
+ @TimerId(STATE_EXPIRING) Timer timer,
+ @StateId(PERSON) @AlwaysFetched ValueState<Person> personState,
+ @StateId(AUCTIONS) BagState<Auction> auctionsState) {
// We would *almost* implement this by rewindowing into the global
window and
// running a combiner over the result. The combiner's accumulator would
be the
// state we use below. However, combiners cannot emit intermediate
results, thus
- // we need to wait for the pending ReduceFn API.
+ // we need to wait for the pending ReduceFn API
Person existingPerson = personState.read();
- if (existingPerson != null) {
- // We've already seen the new person event for this person id.
- // We can join with any new auctions on-the-fly without needing any
- // additional persistent state.
- for (Auction newAuction :
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
- newAuctionCounter.inc();
- newOldOutputCounter.inc();
- c.output(KV.of(newAuction, existingPerson));
- }
- return;
- }
-
- Person theNewPerson = null;
- for (Person newPerson :
c.element().getValue().getAll(NexmarkQueryUtil.PERSON_TAG)) {
- if (theNewPerson == null) {
- theNewPerson = newPerson;
+ if (c.element().getValue().newPerson != null) {
+ Person person = c.element().getValue().newPerson;
+ if (existingPerson == null) {
+ newPersonCounter.inc();
+ personState.write(person);
} else {
- if (theNewPerson.equals(newPerson)) {
- LOG.error("Duplicate person {}", theNewPerson);
+ if (person.equals(existingPerson)) {
+ LOG.error("Duplicate person {}", person);
} else {
- LOG.error("Conflicting persons {} and {}", theNewPerson,
newPerson);
+ LOG.error("Conflicting persons {} and {}", existingPerson, person);
}
fatalCounter.inc();
- continue;
}
- newPersonCounter.inc();
// We've now seen the person for this person id so can flush any
// pending auctions for the same seller id (an auction is done by only
one seller).
Review comment:
If this is not the first time this person has been seen, there's no
point in even fetching auctionsState, right?
##########
File path:
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
##########
@@ -164,21 +165,19 @@ public void processElement(ProcessContext c) {
@StateId(PERSON)
private static final StateSpec<ValueState<Person>> personSpec =
StateSpecs.value(Person.CODER);
- private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
+ private static final String STATE_EXPIRING = "stateExpiring";
@StateId(AUCTIONS)
- private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
- StateSpecs.value(ListCoder.of(Auction.CODER));
+ private final StateSpec<BagState<Auction>> auctionsSpec =
StateSpecs.bag(Auction.CODER);
- @TimerId(PERSON_STATE_EXPIRING)
- private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+ @TimerId(STATE_EXPIRING)
Review comment:
Why are you making the expiration timer a processing-time timer? Isn't
event time more correct here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]