[
https://issues.apache.org/jira/browse/FLINK-758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14124904#comment-14124904
]
ASF GitHub Bot commented on FLINK-758:
--------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/63#discussion_r17214869
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/CountOperator.java
---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * A {@link DataSet} that is the result of a count transformation.
+ * <p/>
+ * The count will be executed as a map-reduce. The map operator maps every
element of the input to a 1 and the all
+ * reduce sums the ones up to the total count.
+ *
+ * @param <IN> The type of the data set aggregated by the operator.
+ */
+public class CountOperator<IN> extends SingleInputUdfOperator<IN, Long,
CountOperator<IN>> {
+
+ private final Grouping<IN> grouping;
+
+ public CountOperator(DataSet<IN> input) {
+ super(input, BasicTypeInfo.LONG_TYPE_INFO);
+ grouping = null;
+ }
+
+ public CountOperator(Grouping<IN> input) {
+ super(Validate.notNull(input).getDataSet(),
BasicTypeInfo.LONG_TYPE_INFO);
+ this.grouping = input;
+ }
+
+ @Override
+ protected org.apache.flink.api.common.operators.SingleInputOperator<?,
Long, ?> translateToDataFlow(
+ org.apache.flink.api.common.operators.Operator<IN>
input) {
+ if (grouping == null) {
+ // map to ones
+ UnaryOperatorInformation<IN, Long> countMapOpInfo =
+ new UnaryOperatorInformation<IN,
Long>(getInputType(), BasicTypeInfo.LONG_TYPE_INFO);
+ MapOperatorBase<IN, Long, MapFunction<IN, Long>>
countMapOp =
+ new MapOperatorBase<IN, Long,
MapFunction<IN, Long>>(
+ new CountingMapUdf(),
countMapOpInfo, "Count: map to ones");
+
+ countMapOp.setInput(input);
+
countMapOp.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+ // sum ones
+ UnaryOperatorInformation<Long, Long> countReduceOpInfo =
+ new UnaryOperatorInformation<Long,
Long>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+ ReduceOperatorBase<Long, ReduceFunction<Long>>
countReduceOp =
+ new ReduceOperatorBase<Long,
ReduceFunction<Long>>(
+ new
CountingReduceUdf(), countReduceOpInfo, "Count: sum ones");
+
+ countReduceOp.setInput(countMapOp);
+ countReduceOp.setDegreeOfParallelism(1);
+
countReduceOp.setInitialValue(countReduceOpInfo.getInputType().createSerializer(),
0L);
+
+ return countReduceOp;
+ }
+ else {
+ return new ReduceGroupOperator<IN, Long>(grouping, new
CountingGroupReduceUdf<IN>())
--- End diff --
Using a non-combinable GroupReduceFunction for counting is unnecessarily
inefficient.
We could extract the key fields using a Mapper and add a count-1 and use a
ReduceFunction as well.
This requires a few cases due to different key types but should be the way
to go.
> Add count method to DataSet and implement CountOperator
> -------------------------------------------------------
>
> Key: FLINK-758
> URL: https://issues.apache.org/jira/browse/FLINK-758
> Project: Flink
> Issue Type: Improvement
> Reporter: GitHub Import
> Labels: github-import
> Fix For: pre-apache
>
> Attachments: pull-request-758-7518001488867571817.patch
>
>
> At the request of @twalthr. This is the count operator I've implemented some
> time ago to get the to know the new Java API. It introduces
> `DataSet.count()`, which is executed as a map (to ones) and reduce (sum up
> the ones). I initially didn't do the PR, because of the following problem:
> empty DataSets don't work as the first map won't have any input to operate on.
> If more people think that we should include this operator we can think about
> a possible solution to the problem.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/pull/758
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api,
> Milestone: Release 0.6 (unplanned)
> Created at: Tue May 06 10:42:33 CEST 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)