[
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=160787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160787
]
ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Oct/18 19:03
Start Date: 30/Oct/18 19:03
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #6883:
[BEAM-4461] Resubmit PR to switch SQL over to schema transfomr
URL: https://github.com/apache/beam/pull/6883#discussion_r229445303
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationCombineFnAdapter.java
##########
@@ -17,77 +17,146 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.transform.agg;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.extensions.sql.impl.UdafImpl;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
-import
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationArgsAdapter.ArgsAdapter;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.calcite.util.Pair;
-/**
- * Wrapper {@link CombineFn} for aggregation function call.
- *
- * <p>Delegates to the actual aggregation {@link CombineFn}, either built-in,
or UDAF.
- *
- * <p>Actual aggregation {@link CombineFn CombineFns} expect their specific
arguments, not the full
- * input row. This class uses {@link ArgsAdapter arg adapters} to extract and
map the call arguments
- * to the {@link CombineFn CombineFn's} inputs.
- */
-public class AggregationCombineFnAdapter extends CombineFn<Row, Object,
Object> {
+/** Wrapper {@link CombineFn}s for aggregation function calls. */
+public class AggregationCombineFnAdapter<T> {
+ private abstract static class WrappedCombinerBase<T> extends CombineFn<T,
Object, Object> {
+ CombineFn<T, Object, Object> combineFn;
+
+ WrappedCombinerBase(CombineFn<T, Object, Object> combineFn) {
+ this.combineFn = combineFn;
+ }
+
+ @Override
+ public Object createAccumulator() {
+ return combineFn.createAccumulator();
+ }
+
+ @Override
+ public Object addInput(Object accumulator, T input) {
+ T processedInput = getInput(input);
+ return (processedInput == null)
+ ? accumulator
+ : combineFn.addInput(accumulator, getInput(input));
+ }
- // Field for a function call
- private Schema.Field field;
+ @Override
+ public Object mergeAccumulators(Iterable<Object> accumulators) {
+ return combineFn.mergeAccumulators(accumulators);
+ }
- // Actual aggregation CombineFn
- private CombineFn combineFn;
+ @Override
+ public Object extractOutput(Object accumulator) {
+ return combineFn.extractOutput(accumulator);
+ }
- // Adapter to convert input Row to CombineFn's arguments
- private ArgsAdapter argsAdapter;
+ @Nullable
+ abstract T getInput(T input);
- /** {@link Schema.Field} with this function call. */
- public Schema.Field field() {
- return field;
+ @Override
+ public Coder<Object> getAccumulatorCoder(CoderRegistry registry, Coder<T>
inputCoder)
+ throws CannotProvideCoderException {
+ return combineFn.getAccumulatorCoder(registry, inputCoder);
+ }
}
- private AggregationCombineFnAdapter(
- Schema.Field field, CombineFn combineFn, ArgsAdapter argsAdapter) {
- this.field = field;
- this.combineFn = combineFn;
- this.argsAdapter = argsAdapter;
+ private static class MultiInputCombiner extends WrappedCombinerBase<Row> {
+ MultiInputCombiner(CombineFn<Row, Object, Object> combineFn) {
+ super(combineFn);
+ }
+
+ @Override
+ Row getInput(Row input) {
+ for (Object o : input.getValues()) {
+ if (o == null) {
Review comment:
correct, but the returned list might contain null values.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 160787)
Time Spent: 16h (was: 15h 50m)
> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Reuven Lax
> Assignee: Reuven Lax
> Priority: Major
> Time Spent: 16h
> Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)