[ 
https://issues.apache.org/jira/browse/FLINK-758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14124950#comment-14124950
 ] 

ASF GitHub Bot commented on FLINK-758:
--------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/63#discussion_r17215488
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CountOperator.java 
---
    @@ -0,0 +1,125 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.commons.lang3.Validate;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.functions.ReduceFunction;
    +import org.apache.flink.api.java.typeutils.BasicTypeInfo;
    +import org.apache.flink.util.Collector;
    +
    +import java.util.Iterator;
    +
    +/**
    + * A {@link DataSet} that is the result of a count transformation.
    + * <p/>
    + * The count will be executed as a map-reduce. The map operator maps every 
element of the input to a 1 and the all
    + * reduce sums the ones up to the total count.
    + *
    + * @param <IN> The type of the data set aggregated by the operator.
    + */
    +public class CountOperator<IN> extends SingleInputUdfOperator<IN, Long, 
CountOperator<IN>> {
    +
    +   private final Grouping<IN> grouping;
    +
    +   public CountOperator(DataSet<IN> input) {
    +           super(input, BasicTypeInfo.LONG_TYPE_INFO);
    +           grouping = null;
    +   }
    +
    +   public CountOperator(Grouping<IN> input) {
    +           super(Validate.notNull(input).getDataSet(), 
BasicTypeInfo.LONG_TYPE_INFO);
    +           this.grouping = input;
    +   }
    +
    +   @Override
    +   protected org.apache.flink.api.common.operators.SingleInputOperator<?, 
Long, ?> translateToDataFlow(
    +                   org.apache.flink.api.common.operators.Operator<IN> 
input) {
    +           if (grouping == null) {
    +                   // map to ones
    +                   UnaryOperatorInformation<IN, Long> countMapOpInfo =
    +                                   new UnaryOperatorInformation<IN, 
Long>(getInputType(), BasicTypeInfo.LONG_TYPE_INFO);
    +                   MapOperatorBase<IN, Long, MapFunction<IN, Long>> 
countMapOp =
    +                                   new MapOperatorBase<IN, Long, 
MapFunction<IN, Long>>(
    +                                                   new CountingMapUdf(), 
countMapOpInfo, "Count: map to ones");
    +
    +                   countMapOp.setInput(input);
    +                   
countMapOp.setDegreeOfParallelism(input.getDegreeOfParallelism());
    +
    +                   // sum ones
    +                   UnaryOperatorInformation<Long, Long> countReduceOpInfo =
    +                                   new UnaryOperatorInformation<Long, 
Long>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    +                   ReduceOperatorBase<Long, ReduceFunction<Long>> 
countReduceOp =
    +                                   new ReduceOperatorBase<Long, 
ReduceFunction<Long>>(
    +                                                   new 
CountingReduceUdf(), countReduceOpInfo, "Count: sum ones");
    +
    +                   countReduceOp.setInput(countMapOp);
    +                   countReduceOp.setDegreeOfParallelism(1);
    +                   
countReduceOp.setInitialValue(countReduceOpInfo.getInputType().createSerializer(),
 0L);
    +
    +                   return countReduceOp;
    +           }
    +           else {
    +                   return new ReduceGroupOperator<IN, Long>(grouping, new 
CountingGroupReduceUdf<IN>())
    --- End diff --
    
    +1


> Add count method to DataSet and implement CountOperator
> -------------------------------------------------------
>
>                 Key: FLINK-758
>                 URL: https://issues.apache.org/jira/browse/FLINK-758
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>              Labels: github-import
>             Fix For: pre-apache
>
>         Attachments: pull-request-758-7518001488867571817.patch
>
>
> At the request of @twalthr. This is the count operator I've implemented some 
> time ago to get the to know the new Java API. It introduces 
> `DataSet.count()`, which is executed as a map (to ones) and reduce (sum up 
> the ones). I initially didn't do the PR, because of the following problem: 
> empty DataSets don't work as the first map won't have any input to operate on.
> If more people think that we should include this operator we can think about 
> a possible solution to the problem.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/pull/758
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api, 
> Milestone: Release 0.6 (unplanned)
> Created at: Tue May 06 10:42:33 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to