Re: set state.checkpoint.dir to execution environment

2017-09-02 Thread Tony Wei
Hi Jose,

It seems that you enable the externalized checkpoints in your streaming job.
If enabling externalized checkpoints is what you really want to,
'state.checkpoints.dir' must be set in flink-conf.yaml.
For your second question, yes, the only way is to modify the
flink-conf.yaml. See the reference
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#directory-structure
.

Best,
Tony Wei

2017-09-03 7:09 GMT+08:00 Jose Miguel Tejedor Fernandez <
jose.fernan...@rovio.com>:

> Hi,
>
> After enabling checkpoints and set the property env.setStateBackend(new
> FsStateBackend(url)) I am the following exception:
>
> Caused by: java.lang.IllegalStateException: CheckpointConfig says to
> persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key 'state.checkpoints.dir'.
>
> I have not found any way to set the 'state.checkpoints.dir' to the
> execution environment. Is it possible? or the only way is to modify the
> file flink-conf.yaml instead?
> Thanks
>
> BR
>
>


set state.checkpoint.dir to execution environment

2017-09-02 Thread Jose Miguel Tejedor Fernandez
Hi,

After enabling checkpoints and set the property env.setStateBackend(new
FsStateBackend(url)) I am the following exception:

Caused by: java.lang.IllegalStateException: CheckpointConfig says to
persist periodic checkpoints, but no checkpoint directory has been
configured. You can configure configure one via key 'state.checkpoints.dir'.

I have not found any way to set the 'state.checkpoints.dir' to the
execution environment. Is it possible? or the only way is to modify the
file flink-conf.yaml instead?
Thanks

BR


How to fill flink's datastream

2017-09-02 Thread AndreaKinn
Hi, 
Excuse me for the unclear title but I don't know how to summarise the
question.
I'm using an external library integrated with Flink called Flink-HTM. It is
still a prototype.
Internally, it performs everything I want but I have a problem returning
evaluated values in a  printable datastream.
I posted here my question because I believe the problem is tied with Flink
and not with the library.

Essentially I have the following code in my main:

*/DataStream result = HTM.learn(kafkaStream, new
Harness.AnomalyNetwork())
.select(new 
InferenceSelectFunction() {
  @Override
public Double select(Tuple2 inference) throws Exception {
return 
inference.f1.getAnomalyScore();
}
});/*

Then I want to print the datastream "result".
Following the /learn/ method the flink-htm lib correctly performs many
operations on data. 
At the end of this computation, in another class I have a /DataStream/ and essentially I have to call the overridden "/select/"
method on that/ Datastream/.

The code which would do that is:

*/final DataStream> inferenceStream =
inferenceStreamBuilder.build();
 
   return inferenceStream
.map(new InferenceSelectMapper(clean(inferenceSelectFunction)))
.returns(returnType);/ 
*
where /map/ and /returns/ methods are described in Flink's
/DataStream.class./

*/public  SingleOutputStreamOperator map(MapFunction mapper) {

TypeInformation outType =
TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);

return transform("Map", outType, new 
StreamMap<>(clean(mapper)));
}/*

*/public SingleOutputStreamOperator returns(TypeInformation typeInfo)
{
requireNonNull(typeInfo, "TypeInformation must not be null");

transformation.setOutputType(typeInfo);
return this;
}/*

while /InferenceSelectMapper/ is the following class:

*/private static class InferenceSelectMapper implements
MapFunction, R> {

private final InferenceSelectFunction inferenceSelectFunction;

public InferenceSelectMapper(InferenceSelectFunction
inferenceSelectFunction) {
this.inferenceSelectFunction = inferenceSelectFunction;
 }

@Override
public R map(Tuple2 value) throws Exception {
return inferenceSelectFunction.select(value);
}
}/*

which implements Flink's /MapFunction/. I absolutely need the program call
the /InferenceSelectMapper.map()/ method to call my defined "/select/"
function, unfortunately this doesn't happen. As consequence of that, in main
method and in the IDE console, I suppose the /DataStream result/ is not
filled and none output is printed, which is the my fundamental problem.

Since I'm not a Flink expert I don't know how to perform many operations at
"lower level".
Honestly I don't understand exactly what /map/ and /returns/ methods of
/DataStream.class/ do. I thought a lot about it and I also tried to find a
way to call /InferenceSelectMapper.map()/ method but I don't know how to
extract the /Tuple2/ from the
/DataStream>/.

I'm absolutely sure that the /map/ function I need in
/InferenceSelectMethod/ is not called because it doesn't appear in call
hierarchy and also adding a print instruction that is not showed. 

Please, can you help me to solve this? I've been stuck on it for a week
while the lib's owner doesn't reply to my mails. 
Sorry for the length.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Use cases for Flink

2017-09-02 Thread Krishnanand Khambadkone
I have been trying out various scenarios with Flink with various sources and 
sinks.  I want to know how folks have been using Flink in production.  What are 
the common use cases.  Essentially,  I have implemented similar use cases in 
Spark and now I find it fairly straightforward to convert these flows to Flink. 
 I do see that the throughput in Flink is an order of magnitude greater in 
Flink.  It is also great for subsecond batching and record by record 
processing.   Any other thoughts?

Re: Re: part files written to HDFS with .pending extension

2017-09-02 Thread Krishnanand Khambadkone
 Yes,  I enabled checkpointing and now the files do not have .pending extension.
Thank you Urs.
On Saturday, September 2, 2017, 3:10:28 AM PDT, Urs Schoenenberger 
 wrote:  
 
  Urs Schoenenberger (urs.schoenenber...@tngtech.com) is not on your Guest List 
| Approve sender | Approve domain
Hi,

you need to enable checkpointing for your job. Flink uses ".pending"
extensions to mark parts that have been completely written, but are not
included in a checkpoint yet.

Once you enable checkpointing, the .pending extensions will be removed
whenever a checkpoint completes.

Regards,
Urs

On 02.09.2017 02:46, Krishnanand Khambadkone wrote:
>  BTW, I am using a BucketingSink and a DateTimeBucketer.  Do I need to set 
>any other property to move the files from .pending state.
> BucketingSink sink = new 
> BucketingSink("hdfs://localhost:8020/flinktwitter/");sink.setBucketer(new
>  DateTimeBucketer("-MM-dd--HHmm"));
>    On Friday, September 1, 2017, 5:03:46 PM PDT, Krishnanand Khambadkone 
> wrote:  
>  
>  This message is eligible for Automatic Cleanup! (kkhambadk...@yahoo.com) Add 
>cleanup rule | More info
>  Hi,  I have written a small program that uses a Twitter input stream and a 
>HDFS output sink.  When the files are written to HDFS each part file in the 
>directory has a .pending extension.  I am able to cat the file and see the 
>tweet text.  Is this normal for the part files to have .pending extension.
> 
> -rw-r--r--  3 user  supergroup      46399 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-95.pending
> 
> -rw-r--r--  3 user supergroup      54861 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-96.pending
> 
> -rw-r--r--  3 user supergroup      41878 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-97.pending
> 
> -rw-r--r--  3  user supergroup      42813 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-98.pending
> 
> -rw-r--r--  3  user supergroup      42887 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-99.pending
> 
> 
> 
> BTW, I am using a BucketingSink and a DateTimeBucketer.  Do I need to
> set any other property to move the files from .pending state.
> 
> BucketingSink sink = new
> BucketingSink("hdfs://localhost:8020/flinktwitter/");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> 
> On Friday, September 1, 2017, 5:03:46 PM PDT, Krishnanand Khambadkone
>  wrote:
> 
> 
> Boxbe  This message is eligible for
> Automatic Cleanup! (kkhambadk...@yahoo.com) Add cleanup rule
> 
> | More info
> 
> 
> Hi,  I have written a small program that uses a Twitter input stream and
> a HDFS output sink.  When the files are written to HDFS each part file
> in the directory has a .pending extension.  I am able to cat the file
> and see the tweet text.  Is this normal for the part files to have
> .pending extension.
> 
> -rw-r--r--  3 user  supergroup      46399 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-95.pending
> 
> -rw-r--r--  3 user supergroup      54861 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-96.pending
> 
> -rw-r--r--  3 user supergroup      41878 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-97.pending
> 
> -rw-r--r--  3  user supergroup      42813 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-98.pending
> 
> -rw-r--r--  3  user supergroup      42887 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-99.pending
> 
> 

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Re: termination of stream#iterate on finite streams

2017-09-02 Thread Xingcan Cui
Hi Peter,

I just omitted the filter part. Sorry for that.

Actually, as the javadoc explained, by default a DataStream with iteration
will never terminate. That's because in a
stream environment with iteration, the operator will never know whether the
feedback stream has reached its end
(though the data source is terminated, *there may be unknowable subsequent
data*) and that's why it needs a
timeout value to make the judgement, just like many other function calls in
network connection. In other words,
you know the feedback stream will be empty in the future, but the operator
doesn't. Thus we provide it a maximum
waiting time for the next record.

Internally, this mechanism is implemented via a blocking queue (the related
code can be found here

).

Hope everything is considered this time : )

Best,
Xingcan

On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl  wrote:

>
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui :
>
> In your codes, all the the long values will subtract 1 and be sent back to
> the iterate operator, endlessly.
>
>
>
> Is this true? shouldn't
>
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
>
>
> produce the following _feedback_ streams?
>
> initial input to #iterate(): [1 2 3 4]
>
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which
> actually only happens when setting a timeout value)
>
> Best regards
> Peter
>
>
>


Re: termination of stream#iterate on finite streams

2017-09-02 Thread Peter Ertl

> Am 02.09.2017 um 04:45 schrieb Xingcan Cui :
> 
> In your codes, all the the long values will subtract 1 and be sent back to 
> the iterate operator, endlessly.


Is this true? shouldn't
  val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
(it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
meaningless 'y' chars just to do anything
  })
  iterationResult2.print()

produce the following _feedback_ streams?

initial input to #iterate(): [1 2 3 4]

iteration #1 : [1 2 3]
iteration #2 : [1 2]
iteration #3 : [1]
iteration #4 : []  => empty feedback stream => cause termination? (which 
actually only happens when setting a timeout value)

Best regards
Peter




How to assign windows dynamically after process() operator

2017-09-02 Thread cancobanoglu
hi all, 

I m trying to do a streaming process like below,
 
  1. collect sensor events from a source
  2. collect rule events defined for a device (which streams sensor events)
  3. rules may have been defined with window information for aggregation
processes differently for any device
  4. when a rule for a device with a window info seen in stream then create
a window (tumbling)
  5. if a new rule comes without window info, remove window and process
without window function. 

I took this as a reference :
https://techblog.king.com/rbea-scalable-real-time-analytics-king/

*my streaming code as below;*

mappedDataSource 
.connect(mappedRuleStream)
.keyBy(..deviceId..)
.process(new RuleProcessorFunction())
.windowAll(new CustomTimeWindowing())
.apply(new AllWindowFunction() {

  @Override
  public void apply(TimeWindow window, Iterable
values, Collector out) throws Exception {
System.out.println("hello");
  }
});

*RuleProcessorFunction is *;

public class RuleProcessorFunction extends CoProcessFunction {

  private transient ValueState> state;

  @Override
  public void processElement1(SensorEvent value, Context ctx,
Collector out) throws Exception {
System.out.println("process element device id : " + value.deviceId);
System.out.println("process element solution id : " + value.solutionId);
state.update(Tuple2.of(value, null));

RuleEvent rule = state.value().f1;

// execute if there is a defined rule on incoming event
  }

  @Override
  public void processElement2(RuleEvent value, Context ctx,
Collector out) throws Exception {
System.out.println("rule stream element solId :" + value.solutionId + "
devId : " + value.deviceId);
state.value().f1 = value;
// store rule in memory
// processed event is gonna be stored window information and downstream
is window assignment
ProcessedEvent processedEvent = new ProcessedEvent();
processedEvent.deviceId = value.deviceId;
processedEvent.solutionId = value.solutionId;
processedEvent.windowInfo = value.window;
processedEvent.ruleId = value.ruleId;

out.collect(processedEvent);

  }

  @Override
  public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> stateDescriptor
=
new ValueStateDescriptor<>("processor", TypeInformation.of(new
TypeHint>() {
}));
state = getRuntimeContext().getState(stateDescriptor);

  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
// rule triggers
  }
}


*CustomWindowAssigner is ;*

public class CustomTimeWindowing extends TumblingEventTimeWindows {
  public CustomTimeWindowing() {
super(1, 0);
  }

  @Override
  public Collection assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
System.out.println("creating window : ");
io.iven.stream.processing.ProcessedEvent processedEvent =
(ProcessedEvent) element;
int windowInfo = processedEvent.windowInfo;
System.out.println("creating window  rule : " + processedEvent.ruleId);
long size = windowInfo * 1000;
System.out.println("window info in milisecond :" + size);
long start = timestamp - (timestamp % size);
long end = start + size;
return Collections.singletonList(new TimeWindow(start, end));
  }
}

When a ruleEvent comes i'm adding metadata about window info and add into
the collector to keep streaming. but if i do this in processElement1 for
SensorEvent, then windowAssigner is gonna be called again and window will be
changed. I want it to enter when a new/changed window info comes. 
 
Could you guide me to do this ? What is the correct way to create this kind
of structure ? Managing windows manually or using this kind of custom window
assigners ? 

another ref :
https://stackoverflow.com/questions/34596230/differences-between-working-with-states-and-windowstime-in-flink-streaming

Thanks







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to assign windows dynamically after process() operator

2017-09-02 Thread cancobanoglu
hi all, 

I m trying to do a streaming process like below,
 
  1. collect sensor events from a source
  2. collect rule events defined for a device (which streams sensor events)
  3. rules may have been defined with window information for aggregation
processes differently for any device
  4. when a rule for a device with a window info seen in stream then create
a window (tumbling)
  5. if a new rule comes without window info, remove window and process
without window function. 

I took this as a reference :
https://techblog.king.com/rbea-scalable-real-time-analytics-king/

*my streaming code as below;*

mappedDataSource 
.connect(mappedRuleStream)
.keyBy(..deviceId..)
.process(new RuleProcessorFunction())
.windowAll(new CustomTimeWindowing())
.apply(new AllWindowFunction() {

  @Override
  public void apply(TimeWindow window, Iterable
values, Collector out) throws Exception {
System.out.println("hello");
  }
});

*RuleProcessorFunction is *;

public class RuleProcessorFunction extends CoProcessFunction {

  private transient ValueState> state;

  @Override
  public void processElement1(SensorEvent value, Context ctx,
Collector out) throws Exception {
System.out.println("process element device id : " + value.deviceId);
System.out.println("process element solution id : " + value.solutionId);
state.update(Tuple2.of(value, null));

RuleEvent rule = state.value().f1;

// execute if there is a defined rule on incoming event
  }

  @Override
  public void processElement2(RuleEvent value, Context ctx,
Collector out) throws Exception {
System.out.println("rule stream element solId :" + value.solutionId + "
devId : " + value.deviceId);
state.value().f1 = value;
// store rule in memory
// processed event is gonna be stored window information and downstream
is window assignment
ProcessedEvent processedEvent = new ProcessedEvent();
processedEvent.deviceId = value.deviceId;
processedEvent.solutionId = value.solutionId;
processedEvent.windowInfo = value.window;
processedEvent.ruleId = value.ruleId;

out.collect(processedEvent);

  }

  @Override
  public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> stateDescriptor
=
new ValueStateDescriptor<>("processor", TypeInformation.of(new
TypeHint>() {
}));
state = getRuntimeContext().getState(stateDescriptor);

  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
// rule triggers
  }
}


*CustomWindowAssigner is ;*

public class CustomTimeWindowing extends TumblingEventTimeWindows {
  public CustomTimeWindowing() {
super(1, 0);
  }

  @Override
  public Collection assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
System.out.println("creating window : ");
io.iven.stream.processing.ProcessedEvent processedEvent =
(ProcessedEvent) element;
int windowInfo = processedEvent.windowInfo;
System.out.println("creating window  rule : " + processedEvent.ruleId);
long size = windowInfo * 1000;
System.out.println("window info in milisecond :" + size);
long start = timestamp - (timestamp % size);
long end = start + size;
return Collections.singletonList(new TimeWindow(start, end));
  }
}

When a ruleEvent comes i'm adding metadata about window info and add into
the collector to keep streaming. but if i do this in processElement1 for
SensorEvent, then windowAssigner is gonna be called again and window will be
changed. I want it to enter when a new/changed window info comes. 
 
Could you guide me to do this ? What is the correct way to create this kind
of structure ? Managing windows manually or using this kind of custom window
assigners ? 

another ref :
https://stackoverflow.com/questions/34596230/differences-between-working-with-states-and-windowstime-in-flink-streaming

Thanks







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: part files written to HDFS with .pending extension

2017-09-02 Thread Urs Schoenenberger
Hi,

you need to enable checkpointing for your job. Flink uses ".pending"
extensions to mark parts that have been completely written, but are not
included in a checkpoint yet.

Once you enable checkpointing, the .pending extensions will be removed
whenever a checkpoint completes.

Regards,
Urs

On 02.09.2017 02:46, Krishnanand Khambadkone wrote:
>  BTW, I am using a BucketingSink and a DateTimeBucketer.  Do I need to set 
> any other property to move the files from .pending state.
> BucketingSink sink = new 
> BucketingSink("hdfs://localhost:8020/flinktwitter/");sink.setBucketer(new
>  DateTimeBucketer("-MM-dd--HHmm"));
> On Friday, September 1, 2017, 5:03:46 PM PDT, Krishnanand Khambadkone 
>  wrote:  
>  
>  This message is eligible for Automatic Cleanup! (kkhambadk...@yahoo.com) Add 
> cleanup rule | More info
>  Hi,  I have written a small program that uses a Twitter input stream and a 
> HDFS output sink.   When the files are written to HDFS each part file in the 
> directory has a .pending extension.  I am able to cat the file and see the 
> tweet text.  Is this normal for the part files to have .pending extension.
> 
> -rw-r--r--   3 user  supergroup  46399 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-95.pending
> 
> -rw-r--r--   3 user supergroup  54861 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-96.pending
> 
> -rw-r--r--   3 user supergroup  41878 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-97.pending
> 
> -rw-r--r--   3  user supergroup  42813 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-98.pending
> 
> -rw-r--r--   3  user supergroup  42887 2017-09-01 16:35 
> /flinktwitter/2017-09-01--1635/_part-0-99.pending
> 
> 
> 
> BTW, I am using a BucketingSink and a DateTimeBucketer.  Do I need to
> set any other property to move the files from .pending state.
> 
> BucketingSink sink = new
> BucketingSink("hdfs://localhost:8020/flinktwitter/");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> 
> On Friday, September 1, 2017, 5:03:46 PM PDT, Krishnanand Khambadkone
>  wrote:
> 
> 
> Boxbe  This message is eligible for
> Automatic Cleanup! (kkhambadk...@yahoo.com) Add cleanup rule
> 
> | More info
> 
> 
> Hi,  I have written a small program that uses a Twitter input stream and
> a HDFS output sink.   When the files are written to HDFS each part file
> in the directory has a .pending extension.  I am able to cat the file
> and see the tweet text.  Is this normal for the part files to have
> .pending extension.
> 
> -rw-r--r--   3 user  supergroup  46399 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-95.pending
> 
> -rw-r--r--   3 user supergroup  54861 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-96.pending
> 
> -rw-r--r--   3 user supergroup  41878 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-97.pending
> 
> -rw-r--r--   3  user supergroup  42813 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-98.pending
> 
> -rw-r--r--   3  user supergroup  42887 2017-09-01 16:35
> /flinktwitter/2017-09-01--1635/_part-0-99.pending
> 
> 

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082