[ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=160697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160697
 ]

ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Oct/18 16:07
            Start Date: 30/Oct/18 16:07
    Worklog Time Spent: 10m 
      Work Description: akedin commented on a change in pull request #6883: 
[BEAM-4461] Resubmit PR to switch SQL over to schema transfomr
URL: https://github.com/apache/beam/pull/6883#discussion_r229372889
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/AggregationCombineFnAdapter.java
 ##########
 @@ -17,77 +17,146 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.transform.agg;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.extensions.sql.impl.UdafImpl;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
-import 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationArgsAdapter.ArgsAdapter;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
-import org.apache.calcite.util.Pair;
 
-/**
- * Wrapper {@link CombineFn} for aggregation function call.
- *
- * <p>Delegates to the actual aggregation {@link CombineFn}, either built-in, 
or UDAF.
- *
- * <p>Actual aggregation {@link CombineFn CombineFns} expect their specific 
arguments, not the full
- * input row. This class uses {@link ArgsAdapter arg adapters} to extract and 
map the call arguments
- * to the {@link CombineFn CombineFn's} inputs.
- */
-public class AggregationCombineFnAdapter extends CombineFn<Row, Object, 
Object> {
+/** Wrapper {@link CombineFn}s for aggregation function calls. */
+public class AggregationCombineFnAdapter<T> {
+  private abstract static class WrappedCombinerBase<T> extends CombineFn<T, 
Object, Object> {
+    CombineFn<T, Object, Object> combineFn;
+
+    WrappedCombinerBase(CombineFn<T, Object, Object> combineFn) {
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public Object createAccumulator() {
+      return combineFn.createAccumulator();
+    }
+
+    @Override
+    public Object addInput(Object accumulator, T input) {
+      T processedInput = getInput(input);
+      return (processedInput == null)
+          ? accumulator
+          : combineFn.addInput(accumulator, getInput(input));
+    }
 
-  // Field for a function call
-  private Schema.Field field;
+    @Override
+    public Object mergeAccumulators(Iterable<Object> accumulators) {
+      return combineFn.mergeAccumulators(accumulators);
+    }
 
-  // Actual aggregation CombineFn
-  private CombineFn combineFn;
+    @Override
+    public Object extractOutput(Object accumulator) {
+      return combineFn.extractOutput(accumulator);
+    }
 
-  // Adapter to convert input Row to CombineFn's arguments
-  private ArgsAdapter argsAdapter;
+    @Nullable
+    abstract T getInput(T input);
 
-  /** {@link Schema.Field} with this function call. */
-  public Schema.Field field() {
-    return field;
+    @Override
+    public Coder<Object> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder)
+        throws CannotProvideCoderException {
+      return combineFn.getAccumulatorCoder(registry, inputCoder);
+    }
   }
 
-  private AggregationCombineFnAdapter(
-      Schema.Field field, CombineFn combineFn, ArgsAdapter argsAdapter) {
-    this.field = field;
-    this.combineFn = combineFn;
-    this.argsAdapter = argsAdapter;
+  private static class MultiInputCombiner extends WrappedCombinerBase<Row> {
+    MultiInputCombiner(CombineFn<Row, Object, Object> combineFn) {
+      super(combineFn);
+    }
+
+    @Override
+    Row getInput(Row input) {
+      for (Object o : input.getValues()) {
+        if (o == null) {
+          return null;
+        }
+      }
+      return input;
+    }
   }
 
-  /**
-   * Creates an instance of {@link AggregationCombineFnAdapter}.
-   *
-   * @param callWithAlias Calcite's output, represents a function call paired 
with its field alias
-   */
-  public static AggregationCombineFnAdapter of(
-      Pair<AggregateCall, String> callWithAlias, Schema inputSchema) {
-    AggregateCall call = callWithAlias.getKey();
-    Schema.Field field = CalciteUtils.toField(callWithAlias.getValue(), 
call.getType());
-    String functionName = call.getAggregation().getName();
-
-    return new AggregationCombineFnAdapter(
-        field,
-        createCombineFn(call, field, functionName),
-        AggregationArgsAdapter.of(call.getArgList(), inputSchema));
+  private static class SingleInputCombiner extends WrappedCombinerBase<Object> 
{
+    SingleInputCombiner(CombineFn<Object, Object, Object> combineFn) {
+      super(combineFn);
+    }
+
+    @Override
+    Object getInput(Object input) {
+      return input;
+    }
+  }
+
+  private static class ConstantEmpty extends CombineFn<Row, Row, Row> {
+    private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+    private static final Row EMPTY_ROW = Row.withSchema(EMPTY_SCHEMA).build();
+
+    public static final ConstantEmpty INSTANCE = new ConstantEmpty();
+
+    @Override
+    public Row createAccumulator() {
+      return EMPTY_ROW;
+    }
+
+    @Override
+    public Row addInput(Row accumulator, Row input) {
+      return EMPTY_ROW;
+    }
+
+    @Override
+    public Row mergeAccumulators(Iterable<Row> accumulators) {
+      return EMPTY_ROW;
+    }
+
+    @Override
+    public Row extractOutput(Row accumulator) {
+      return EMPTY_ROW;
+    }
+
+    @Override
+    public Coder<Row> getAccumulatorCoder(CoderRegistry registry, Coder<Row> 
inputCoder)
+        throws CannotProvideCoderException {
+      return SchemaCoder.of(EMPTY_SCHEMA);
+    }
+
+    @Override
+    public Coder<Row> getDefaultOutputCoder(CoderRegistry registry, Coder<Row> 
inputCoder) {
+      return SchemaCoder.of(EMPTY_SCHEMA);
+    }
   }
 
   /** Creates either a UDAF or a built-in {@link CombineFn}. */
-  private static CombineFn<?, ?, ?> createCombineFn(
+  public static CombineFn<?, ?, ?> createCombineFn(
       AggregateCall call, Schema.Field field, String functionName) {
+    CombineFn combineFn;
     if (call.getAggregation() instanceof SqlUserDefinedAggFunction) {
-      return getUdafCombineFn(call);
+      combineFn = getUdafCombineFn(call);
+    } else {
+      combineFn = BeamBuiltinAggregations.create(functionName, 
field.getType().getTypeName());
+    }
+    if (call.getArgList().isEmpty()) {
+      return new SingleInputCombiner(combineFn);
 
 Review comment:
   `ConstantEmpty` here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 160697)
    Time Spent: 15h 20m  (was: 15h 10m)

> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
>                 Key: BEAM-4461
>                 URL: https://issues.apache.org/jira/browse/BEAM-4461
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 15h 20m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to