Hi,
I tried to remove the returns function but if I do it, the program returns
an error (curious since the return value is a Double).

I'm absolutely sure env.execute() is called because I see other streams
printed.

The program is connected, I followed exactly the example showed in the
library, I think that's a bug and I need to solve it. 

The inference stream where is called the map function is surely full.

Thank you


Fabian Hueske-2 wrote
> Hi Andrea,
> 
> a MapFunction calls its map() function for each stream element and returns
> exactly one result value.
> MapFunctions are used for 1-to-1 transformations.
> The returns() method allows to specify the return type of an operator, in
> your case the MapOperator. It is only necessary if Flink cannot
> automatically determine the return type of an operator.
> 
> It's not easy to identify what is going on from the code you posted.
> Are you sure the program is executed, i.e., did you call env.execute()?
> Are all parts of the program connected?
> Are you sure that the input stream of the Map operator emits records?
> 
> Best, Fabian
> 
> 
> 2017-09-02 19:23 GMT+02:00 AndreaKinn <

> kinn6aer@

> >:
> 
>> Hi,
>> Excuse me for the unclear title but I don't know how to summarise the
>> question.
>> I'm using an external library integrated with Flink called Flink-HTM. It
>> is
>> still a prototype.
>> Internally, it performs everything I want but I have a problem returning
>> evaluated values in a  printable datastream.
>> I posted here my question because I believe the problem is tied with
>> Flink
>> and not with the library.
>>
>> Essentially I have the following code in my main:
>>
>> */DataStream
> <Double>
>  result = HTM.learn(kafkaStream, new
>> Harness.AnomalyNetwork())
>>                                 .select(new
>> InferenceSelectFunction&lt;Harness.KafkaRecord,
> &gt; Double>() {
>>                   @Override
>>                     public Double select(Tuple2&lt;Harness.KafkaRecord,
> &gt; NetworkInference> inference) throws Exception {
>>                                                 return
>> inference.f1.getAnomalyScore();
>>                     }
>>                                 });/*
>>
>> Then I want to print the datastream "result".
>> Following the /learn/ method the flink-htm lib correctly performs many
>> operations on data.
>> At the end of this computation, in another class I have a
>> /DataStream&lt;T,
> &gt; NetworkInference>/ and essentially I have to call the overridden
> "/select/"
>> method on that/ Datastream&lt;T,NetworkInference&gt;/.
>>
>> The code which would do that is:
>>
>> */final DataStream&lt;Tuple2&amp;lt;T, NetworkInference&gt;>
>> inferenceStream =
>> inferenceStreamBuilder.build();
>>
>>            return inferenceStream
>>                 .map(new InferenceSelectMapper&lt;T,
> &gt; R>(clean(inferenceSelectFunction)))
>>                 .returns(returnType);    /
>> *
>> where /map/ and /returns/ methods are described in Flink's
>> /DataStream.class./
>>
>> */public 
> <R>
>  SingleOutputStreamOperator
> <R>
>  map(MapFunction&lt;T, R&gt; mapper) {
>>
>>                 TypeInformation
> <R>
>  outType =
>> TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
>>                                 Utils.getCallLocationName(), true);
>>
>>                 return transform("Map", outType, new
>> StreamMap<>(clean(mapper)));
>>         }/*
>>
>> */public SingleOutputStreamOperator
> <T>
>  returns(TypeInformation
> <T>
>  typeInfo)
>> {
>>                 requireNonNull(typeInfo, "TypeInformation must not be
>> null");
>>
>>                 transformation.setOutputType(typeInfo);
>>                 return this;
>>         }/*
>>
>> while /InferenceSelectMapper&lt;T,R&gt;/ is the following class:
>>
>> */private static class InferenceSelectMapper&lt;T, R&gt; implements
>> MapFunction&lt;Tuple2&amp;lt;T, NetworkInference&gt;, R> {
>>
>>         private final InferenceSelectFunction&lt;T, R&gt;
>> inferenceSelectFunction;
>>
>>         public InferenceSelectMapper(InferenceSelectFunction&lt;T, R&gt;
>> inferenceSelectFunction) {
>>                         this.inferenceSelectFunction =
>> inferenceSelectFunction;
>>          }
>>
>>         @Override
>>         public R map(Tuple2&lt;T, NetworkInference&gt; value) throws
>> Exception {
>>                         return inferenceSelectFunction.select(value);
>>         }
>>     }/*
>>
>> which implements Flink's /MapFunction/. I absolutely need the program
>> call
>> the /InferenceSelectMapper.map()/ method to call my defined "/select/"
>> function, unfortunately this doesn't happen. As consequence of that, in
>> main
>> method and in the IDE console, I suppose the /DataStream result/ is not
>> filled and none output is printed, which is the my fundamental problem.
>>
>> Since I'm not a Flink expert I don't know how to perform many operations
>> at
>> "lower level".
>> Honestly I don't understand exactly what /map/ and /returns/ methods of
>> /DataStream.class/ do. I thought a lot about it and I also tried to find
>> a
>> way to call /InferenceSelectMapper.map()/ method but I don't know how to
>> extract the /Tuple2&lt;T, NetworkInference&gt;/ from the
>> /DataStream&lt;Tuple2&amp;lt;...&gt;>/.
>>
>> I'm absolutely sure that the /map/ function I need in
>> /InferenceSelectMethod/ is not called because it doesn't appear in call
>> hierarchy and also adding a print instruction that is not showed.
>>
>> Please, can you help me to solve this? I've been stuck on it for a week
>> while the lib's owner doesn't reply to my mails.
>> Sorry for the length.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to