[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15287916#comment-15287916
 ] 

ASF GitHub Bot commented on APEXMALHAR-2006:
--------------------------------------------

Github user davidyan74 commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/261#discussion_r63624391
  
    --- Diff: 
stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
 ---
    @@ -0,0 +1,361 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.stream.api.operator;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.stream.api.function.Function;
    +import org.apache.commons.io.IOUtils;
    +
    +import 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
    +import 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
    +import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Operator;
    +import com.datatorrent.netlet.util.DTThrowable;
    +
    +/**
    + * Operators that wrap the functions
    + */
    +public class FunctionOperator<OUT> implements Operator
    +{
    +  private byte[] annonymousFunctionClass;
    +
    +  protected transient Function statelessF;
    +
    +  protected Function statefulF;
    +
    +  protected boolean stateful = false;
    +
    +  protected boolean isAnnonymous = false;
    +
    +  public final transient DefaultOutputPort<OUT> output = new 
DefaultOutputPort<>();
    +
    +  public FunctionOperator(Function f)
    +  {
    +    isAnnonymous = f.getClass().isAnonymousClass();
    +    if (isAnnonymous) {
    +      annonymousFunctionClass = functionClassData(f);
    +    } else if (f instanceof Function.Stateful) {
    +      statelessF = f;
    +    } else {
    +      statefulF = f;
    +      stateful = true;
    +    }
    +  }
    +
    +  private byte[] functionClassData(Function f)
    +  {
    +    Class<Function> classT = (Class<Function>)f.getClass();
    +
    +    byte[] classBytes = null;
    +    byte[] classNameBytes = null;
    +    String className = classT.getName();
    +    try {
    +      classNameBytes = className.replace('.', '/').getBytes();
    +      classBytes = 
IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.',
 '/') + ".class"));
    +      int cursor = 0;
    +      for (int j = 0; j < classBytes.length; j++) {
    +        if (classBytes[j] != classNameBytes[cursor]) {
    +          cursor = 0;
    +        } else {
    +          cursor++;
    +        }
    +
    +        if (cursor == classNameBytes.length) {
    +          for (int p = 0; p < classNameBytes.length; p++) {
    +            if (classBytes[j - p] == '$') {
    +              classBytes[j - p] = '_';
    +            }
    +          }
    +          cursor = 0;
    +        }
    +      }
    +      ClassReader cr = new ClassReader(new 
ByteArrayInputStream(classBytes));
    +      ClassWriter cw = new ClassWriter(0);
    +      AnnonymousClassModifier annonymousClassModifier = new 
AnnonymousClassModifier(Opcodes.ASM4, cw);
    +      cr.accept(annonymousClassModifier, 0);
    +      classBytes = cw.toByteArray();
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    }
    +    int dataLength = classNameBytes.length + 4 + 4;
    +
    +    ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream(dataLength);
    +    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
    +
    +    try {
    +      output.writeInt(classNameBytes.length);
    +      output.write(className.replace('$', '_').getBytes());
    +      output.writeInt(classBytes.length);
    +      output.write(classBytes);
    +    } catch (IOException e) {
    +      DTThrowable.rethrow(e);
    +    } finally {
    +      try {
    +        output.flush();
    +        output.close();
    +      } catch (IOException e) {
    +        DTThrowable.rethrow(e);
    --- End diff --
    
    DTThrowable shouldn't be used
     Add a line note



> Stream API Design
> -----------------
>
>                 Key: APEXMALHAR-2006
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2006
>             Project: Apache Apex Malhar
>          Issue Type: Sub-task
>            Reporter: Siyuan Hua
>            Assignee: Siyuan Hua
>             Fix For: 3.4.0
>
>
> Construct DAG in a similar way as Flink/Spark Streaming



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to