Window Function on AllWindowed Stream - Combining Kafka Topics

2017-05-02 Thread G.S.Vijay Raajaa
Hi,

I am trying to combine two kafka topics using the a single kafka consumer
on a list of topics, further convert the json string in the stream to POJO.
Then, join them via keyBy ( On event time field ) and to merge them as a
single fat json, I was planning to use a window stream and apply a window
function on the window stream. The assumption is that Topic-A & Topic-B can
be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B
(JSON ) will be present with the same eventTime. Hence was planning to use
a coutWindow(2) post keyBy on eventTime.

I have couple of questions for the same;

1. Is the approach fine for merging topics and creating a single JSON?
2. The window function on All Window stream doesnt seem to work fine; Any
pointers will be greatly appreciated.

Code Snippet :

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");


List < String > names = new ArrayList < > ();


names.add("Topic-A");

names.add("Topic-B");


DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >
(names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new
Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new
Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new
MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {


public void invoke(String value) throws Exception {


  // Yet to be implemented - Merge two POJO into one

 }

});


try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws
Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String,
String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo >
arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);


 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {


   System.out.println(value);


   obj = mapper.readValue(value, TopicPojo.class);


  } catch (JsonParseException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");


  } catch (JsonMappingException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {


   // TODO Auto-generated catch block


   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}

I am getting - The method apply(AllWindowFunction)
in the type AllWindowedStream is not applicable for
the arguments (MyWindowFunction) error.

Kindly give your input.

Regards,
Vijay Raajaa GS


Join two kafka topics

2017-05-02 Thread Tarek khal
I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream. 


Here the result that I expect, but without restarting the Job, here I
restarted the Job to get this result: 


 

Code:





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12943.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: RocksDB error with flink 1.2.0

2017-05-02 Thread Aljoscha Krettek
Hi,
I think there the bottleneck might be HDFS. With 300 operators with parallelism 
6 you will have 1800 concurrent writes (i.e. connections) to HDFS, which might 
be to much for the master node and the worker nodes.

This is the same problem that you had on the local filesystem but now in the 
distributed filesystem.

Best,
Aljoscha

> On 28. Apr 2017, at 22:15, mclendenin  wrote:
> 
> There are only 3 nodes in the HDFS cluster and when running fsck it shows the
> filesystem as healthy.
> 
> $ hdfs fsck
> /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43/
> 17/04/28 16:24:59 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Connecting to namenode via
> http://localhost/fsck?ugi=hadoop&path=%2Fuser%2Fhadoop%2Fflink%2Fcheckpoints%2Fdc2aee563bebce76e420029525c37892%2Fchk-43
> FSCK started by hadoop (auth:SIMPLE) from / for path
> /user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43 at
> Fri Apr 28 16:25:00 EDT 2017
> .Status: HEALTHY
> Total size:   33197 B
> Total dirs:   1
> Total files:  5
> Total symlinks:   0 (Files currently being written: 460)
> Total blocks (validated): 5 (avg. block size 6639 B)
> Minimally replicated blocks:  5 (100.0 %)
> Over-replicated blocks:   0 (0.0 %)
> Under-replicated blocks:  0 (0.0 %)
> Mis-replicated blocks:0 (0.0 %)
> Default replication factor:   2
> Average block replication:3.0
> Corrupt blocks:   0
> Missing replicas: 0 (0.0 %)
> Number of data-nodes: 3
> Number of racks:  1
> FSCK ended at Fri Apr 28 16:25:00 EDT 2017 in 13 milliseconds
> 
> 
> The filesystem under path
> '/user/hadoop/flink/checkpoints/dc2aee563bebce76e420029525c37892/chk-43' is
> HEALTHY
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-error-with-flink-1-2-0-tp12897p12909.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Behavior of the cancel command

2017-05-02 Thread Aljoscha Krettek
Thanks for looking into this, Jürgen!

I opened a Jira issue: https://issues.apache.org/jira/browse/FLINK-6427 


> On 29. Apr 2017, at 09:15, Jürgen Thomann  
> wrote:
> 
> Hi Aljoscha,
> 
> In my case the valid-length file created contains a value which e.g. says 100 
> MB are valid to read for exactly once but the file with the data is only 95 
> MB large. As I understand it the valid-length file contains the length of the 
> file at the checkpoint. This does also not happen for all files (3 HDFS sinks 
> each with a parallelism of 2). For some parts the file size and the value in 
> the valid-length file match exactly. 
> 
> After looking now over the checkpoint code in BucketingSink I looked into the 
> hsync behavior again and found the following page: 
> http://stackoverflow.com/questions/32231105/why-is-hsync-not-flushing-my-hdfs-file
>  
> 
> After this I downloaded the file with the hdfs dfs tool and actually the file 
> is now even larger than the valid-length file. I checked this against the 
> things I did before (Impala and hive select count query, and Hue download of 
> files and wc -l) and this 3 ways result in the same amount of lines but hdfs 
> dfs -cat  | wc -l gives a much larger value. 
> 
> So my conclusion would be that the data is written and not exactly lost as I 
> thought, but for my use case not visible because the files are not properly 
> closed during cancel and the namenode is not aware of the flushed data. So I 
> could imagine 2 ways out of this: 1. implement the hsync as stated at the 
> Stack Overflow page or 2. ensure that files are properly closed during cancel.
> 
> Best,
> Jürgen
> 
> On 28.04.2017 17:38, Aljoscha Krettek wrote:
>> Hi Jürgen,
>> Is there missing data with respect to what should have been written at the 
>> time of the cancel or when the last checkpoint (or in that case, the 
>> savepoint) was performed. I’m asking because the cancel command is only sent 
>> out once the savepoint has been completed, as can be seen at [1]. If the 
>> savepoint is complete this also means that the snapshot method of the 
>> BucketingSink must have done it’s work, i.e. that it also flushed all files, 
>> which is done in [2]. There’s always the possibility of a bug, however, so 
>> we’ll have to look into this together.
>> 
>> Best,
>> Aljoscha
>> 
>> [1] 
>> https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607
>>  
>> 
>> 
>> [2] 
>> https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
>>  
>> 
>> 
>>> On 27. Apr 2017, at 10:27, Jürgen Thomann >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I had some time ago problems with writing data to Hadoop with the 
>>> BucketingSink and losing data in case of cancel with savepoint because 
>>> flush/sync command was interrupted. I tried changing Hadoop settings as 
>>> suggested but had no luck at the end and looked into the Flink code. If I 
>>> understand the code correctly it behaves the following way:
>>> 
>>> 1. Start a Watchdog thread if we have a cancellation timeout set
>>> 2. invoke cancel on the sink/task, but do not wait for it to finish
>>> 3. destroy buffer pool and a release resources
>>> 4. send initial interrupt to the sink/task
>>> 5. call join on the sink/task and ignore InterruptedException
>>> 6. let the watchdog send more interrupts if needed and throw fatal error if 
>>> timeout is reached
>>> 
>>> In my case the BucketingSink does not has enough time to flush everything 
>>> before the initial interrupt is sent and some files are not closed properly 
>>> which causes the missing data in Hadoop in my understanding.
>>> 
>>> Is my understanding correct and if yes, do you know a way to get around 
>>> this behavior to let the close function finish the sync for all files?
>>> 
>>> Best,
>>> Jürgen
>> 



Re: CEP timeout occurs even for a successful match when using followedBy

2017-05-02 Thread Moiz S Jinia
Thanks! I downloaded and built 1.3-SNAPSHOT locally and was able to verify
that followedBy now works as I want.

Moiz

On Sat, Apr 29, 2017 at 11:08 PM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hi Moiz,
>
> Here are the instructions on how to build Flink from source:
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/building.html
>
> Kostas
>
> On Apr 29, 2017, at 7:15 PM, Moiz S Jinia  wrote:
>
> I meant maven dependencies that i can use by generating them from sources.
>
> On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia 
> wrote:
>
>> Ok I'll try that. Its just that I'd rather use a stable version.
>> Are there any instructions for building binaries from latest sources?
>>
>> Moiz
>>
>> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Moiz,
>>>
>>> The skip-till-next is a big change and backporting it does not seem
>>> feasible.
>>> Also this would require more general changes to the 1.2 to make it
>>> compatible with the previous 1.2 versions.
>>>
>>> If you want you can already use the 1.3 version by downloading the
>>> master branch and writing your
>>> use-case against that. The changes until the final release are going to
>>> be minor hopefully and we can
>>> always help you adjust your program accordingly.
>>>
>>> Hope this helps,
>>> Kostas
>>>
>>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia  wrote:
>>>
>>> Oh ok thats a bit far off. Is there any chance of a backport of
>>> https://issues.apache.org/jira/browse/FLINK-6208 to the 1.2 branch? I
>>> require the SKIP_TILL_NEXT behaviour for a production use case that we want
>>> to use Flink for.
>>>
>>> Moiz
>>>
>>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
 The 1.3 is scheduled for the beginning of June.

 Cheers,
 Kostas

 On Apr 29, 2017, at 6:16 PM, Moiz S Jinia  wrote:

 Thanks Dawid!
 Yes thats what i was expecting. I'll give it a try.

 When do you expect 1.3.0 stable to be out?

 Moiz

 On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz <
 wysakowicz.da...@gmail.com> wrote:

> Hi,
>
> This is an expected behaviour. After the "ar" event there still may
> occur other "ar" event that will also trigger a match.
> To be more generic in all versions prior to 1.3.0 there are two
> different consuming strategies:
>
>- STRICT (the next operator) - that accepts only if the event
>occurs directly after the previous
>- SKIP TILL ANY (the followedBy operator) - it accepts any
>matching event following event if there were already an event that 
> matched
>this pattern
>
> Because after "ni" event we could match with some other "ar" events,
> the match is timeouted after 5 seconds.
>
> In FLINK-6208  we
> introduced third consuming strategy:
>
>- SKIP TILL NEXT(this is the strategy for followedBy right now) -
>the event does not have to occur directly after the previous one but 
> only
>one event can be matched
>
> and you can still use SKIP TILL ANY by using followedByAny. I believe
> the SKIP TILL NEXT strategy is the one you expected.
> You can check it on master branch. We did introduce lots of new
> features and bugfixes to CEP for 1.3.0 version so any comments,
> tests or suggestions are welcome.
>
>
> Z pozdrowieniami! / Cheers!
>
> Dawid Wysakowicz
> *Data/Software Engineer*
> Skype: dawid_wys | Twitter: @OneMoreCoder
> 
>
> 2017-04-29 12:14 GMT+02:00 Moiz S Jinia :
>
>> When using "next", this pattern works fine for the both a match as
>> well as a timeout:
>>
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .next("last").where(evt -> evt.value.equals("ar")).within
>> (Time.seconds(5));
>>
>> 1. "ni" then "ar" within 5 seconds - triggers match
>> 2. "ni" then no "ar" within 5 seconds - triggers timeout
>>
>> But with "followedBy", this does not behave as expected:
>>
>> Pattern pattern = Pattern.begin("start")
>> .where(evt -> evt.value.equals("ni"))
>> .followedBy("last").where(evt ->
>> evt.value.equals("ar")).within(Time.seconds(5));
>>
>> "ni" then "ar" within 5 seconds - triggers match and also triggers
>> timeout.
>>
>> Why is the timeout triggered when using followedBy (when there is a
>> match)?
>>
>> Version - 1.1.5.
>>
>
>


>>>
>>>
>>
>
>


Re: CEP timeout occurs even for a successful match when using followedBy

2017-05-02 Thread Kostas Kloudas
Glad to hear that Moiz!
And thanks for helping us test out the library.

Kostas

> On May 2, 2017, at 12:34 PM, Moiz S Jinia  wrote:
> 
> Thanks! I downloaded and built 1.3-SNAPSHOT locally and was able to verify 
> that followedBy now works as I want.
> 
> Moiz
> 
> On Sat, Apr 29, 2017 at 11:08 PM, Kostas Kloudas  > wrote:
> Hi Moiz,
> 
> Here are the instructions on how to build Flink from source:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html
>  
> 
> 
> Kostas
> 
>> On Apr 29, 2017, at 7:15 PM, Moiz S Jinia > > wrote:
>> 
>> I meant maven dependencies that i can use by generating them from sources.
>> 
>> On Sat, Apr 29, 2017 at 10:31 PM, Moiz S Jinia > > wrote:
>> Ok I'll try that. Its just that I'd rather use a stable version.
>> Are there any instructions for building binaries from latest sources?
>> 
>> Moiz
>> 
>> On Sat, Apr 29, 2017 at 10:09 PM, Kostas Kloudas 
>> mailto:k.klou...@data-artisans.com>> wrote:
>> Hi Moiz,
>> 
>> The skip-till-next is a big change and backporting it does not seem 
>> feasible. 
>> Also this would require more general changes to the 1.2 to make it 
>> compatible with the previous 1.2 versions.
>> 
>> If you want you can already use the 1.3 version by downloading the master 
>> branch and writing your 
>> use-case against that. The changes until the final release are going to be 
>> minor hopefully and we can
>> always help you adjust your program accordingly.
>> 
>> Hope this helps,
>> Kostas
>> 
>>> On Apr 29, 2017, at 6:23 PM, Moiz S Jinia >> > wrote:
>>> 
>>> Oh ok thats a bit far off. Is there any chance of a backport of 
>>> https://issues.apache.org/jira/browse/FLINK-6208 
>>>  to the 1.2 branch? I 
>>> require the SKIP_TILL_NEXT behaviour for a production use case that we want 
>>> to use Flink for.
>>> 
>>> Moiz
>>> 
>>> On Sat, Apr 29, 2017 at 9:49 PM, Kostas Kloudas 
>>> mailto:k.klou...@data-artisans.com>> wrote:
>>> The 1.3 is scheduled for the beginning of June.
>>> 
>>> Cheers,
>>> Kostas
>>> 
 On Apr 29, 2017, at 6:16 PM, Moiz S Jinia >>> > wrote:
 
 Thanks Dawid! 
 Yes thats what i was expecting. I'll give it a try.
 
 When do you expect 1.3.0 stable to be out?
 
 Moiz
 
 On Sat, Apr 29, 2017 at 9:20 PM, Dawid Wysakowicz 
 mailto:wysakowicz.da...@gmail.com>> wrote:
 Hi,
 
 This is an expected behaviour. After the "ar" event there still may occur 
 other "ar" event that will also trigger a match.
 To be more generic in all versions prior to 1.3.0 there are two different 
 consuming strategies:
 STRICT (the next operator) - that accepts only if the event occurs 
 directly after the previous 
 SKIP TILL ANY (the followedBy operator) - it accepts any matching event 
 following event if there were already an event that matched this pattern
 Because after "ni" event we could match with some other "ar" events, the 
 match is timeouted after 5 seconds.
 
 In FLINK-6208  we 
 introduced third consuming strategy:
 SKIP TILL NEXT(this is the strategy for followedBy right now) - the event 
 does not have to occur directly after the previous one but only one event 
 can be matched
 and you can still use SKIP TILL ANY by using followedByAny. I believe the 
 SKIP TILL NEXT strategy is the one you expected. 
 You can check it on master branch. We did introduce lots of new features 
 and bugfixes to CEP for 1.3.0 version so any comments,
 tests or suggestions are welcome.
 
 
 Z pozdrowieniami! / Cheers!
 
 Dawid Wysakowicz
 Data/Software Engineer
 Skype: dawid_wys | Twitter: @OneMoreCoder
  
 
 2017-04-29 12:14 GMT+02:00 Moiz S Jinia >>> >:
 When using "next", this pattern works fine for the both a match as well as 
 a timeout:
 
 Pattern pattern = Pattern.begin("start")
 .where(evt -> evt.value.equals("ni"))
 .next("last").where(evt -> 
 evt.value.equals("ar")).within(Time.seconds(5));
 
 1. "ni" then "ar" within 5 seconds - triggers match
 2. "ni" then no "ar" within 5 seconds - triggers timeout
 
 But with "followedBy", this does not behave as expected:
 
 Pattern pattern = Pattern.begin("start")
 .where(evt -> evt.value.equals("ni"))
 .followedBy("last").where(evt -> 
 evt.value.equals("ar")).within(Time.seconds(5));
 
 "ni" then "ar" within 5 seconds - triggers match and also triggers timeout.
 
 Why is the timeout triggered when using followedBy (when there i

Re: Batch source improvement

2017-05-02 Thread Fabian Hueske
Hi Flavio,

actually, Flink did always lazily assign input splits. The JM gets the list
of IS from the InputFormat.
Parallel source instances (with an input format) request an input split
from the JM whenever they do not have anything to do.
This should actually balance some of the data skew in input splits.

Best, Fabian

2017-04-29 11:36 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> we're still using Flink as a batch processor and despite not very
> advertised is still doing great.
> However there's one thing I always wanted to ask: when reading data from a
> source the job manager computes the splits and assigns a set of them to
> every instance of the InputFormat. This works fine until the data is
> pefectly balanced but in my experience most of the times this is not true
> and some of them completes very quickly while some of them continue to read
> data (also for a long time).
>
> Couldn't this be enhanced buffering splits in a shared place so that tasks
> could ask for a "free" split as soon as they complete to read their
> assigned split? Would it be complicated to implement such a logic?
>
> Best,
> Flavio
>


RE: Collector.collect

2017-05-02 Thread Newport, Billy
Why doesn’t this work with batch though. We did

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)


And it was pretty slow compared with a custom outputformat with an integrated 
filter.


From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Monday, May 01, 2017 12:56 PM
To: Newport, Billy [Tech]; 'user@flink.apache.org'
Subject: Re: Collector.collect

Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom output-format is 
the best solution.

In the Streaming API the code below should be equally fast, if the filtered 
sets don't overlap.

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might be as well 
(not to sure on this one).

On 01.05.2017 17:05, Newport, Billy wrote:
There is likely a bug then, the ENUM,Record stream to a filter to a set of 
outputformats per filter was slower than the BITMASK,Record to single 
OutputFormat which demux’s the data to each file internally

Are you saying do a custom writer inside a map rather than either of the 2 
above approaches?


From: Chesnay Schepler [mailto:ches...@apache.org]
Sent: Monday, May 01, 2017 10:41 AM
To: user@flink.apache.org
Subject: Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record, based on the 
bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is chained to the 
map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:
We’ve done that but it’s very expensive from a serialization point of view when 
writing the same record multiple times, each in a different tuple.

For example, we started with this:

.collect(new Tuplemailto:gaurav671...@gmail.com]
Sent: Saturday, April 29, 2017 4:32 AM
To: user@flink.apache.org
Subject: Collector.collect

Hello

I am working on RichProcessFunction and I want to emit multiple records at a 
time. To achieve this, I am currently doing :

while(condition)
{
   Collector.collect(new Tuple<>...);
}

I was wondering, is this the correct way or there is any other alternative.








Re: Collector.collect

2017-05-02 Thread Chesnay Schepler

In the Batch API only a single operator can be chained to another operator.

So we're starting with this code:

   input = ...
   input.filter(conditionA).output(formatA)
   input.filter(conditonB).output(formatB)

In the Batch API this would create a CHAIN(filterA -> formatA) and a 
CHAIN(filterB -> formatB), both having "input" as their input.
Since the filtering is not done as part of "input" the entire input 
DataSet must be sent to both tasks.
This means that both chains have to deserialize the entire DataSet to 
apply the filter; the serialization should only be done once though.


In contrast the solution you wrote creates a single CHAIN(input, 
format), with no serialization in between at all.


The Streaming API doesn't have this limitation and would get by without 
any serialization as well. Probably.


On 02.05.2017 15:23, Newport, Billy wrote:


Why doesn’t this work with batch though. We did

input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

And it was pretty slow compared with a custom outputformat with an 
integrated filter.


*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 12:56 PM
*To:* Newport, Billy [Tech]; 'user@flink.apache.org'
*Subject:* Re: Collector.collect

Oh you have multiple different output formats, missed that.

For the Batch API you are i believe correct, using a custom 
output-format is the best solution.


In the Streaming API the code below should be equally fast, if the 
filtered sets don't overlap.


input = ...
input.filter(conditionA).output(formatA)
input.filter(conditonB).output(formatB)

That is because all filters would be chained; hell all sources might 
be as well (not to sure on this one).


On 01.05.2017 17:05, Newport, Billy wrote:

There is likely a bug then, the ENUM,Record stream to a filter to
a set of outputformats per filter was slower than the
BITMASK,Record to single OutputFormat which demux’s the data to
each file internally

Are you saying do a custom writer inside a map rather than either
of the 2 above approaches?

*From:*Chesnay Schepler [mailto:ches...@apache.org]
*Sent:* Monday, May 01, 2017 10:41 AM
*To:* user@flink.apache.org 
*Subject:* Re: Collector.collect

Hello,

@Billy, what prevented you from duplicating/splitting the record,
based on the bitmask, in a map function before the sink?
This shouldn't incur any serialization overhead if the sink is
chained to the map. The emitted Tuple could also share the
GenericRecord; meaning you don't even have to copy it.

On 01.05.2017 14:52, Newport, Billy wrote:

We’ve done that but it’s very expensive from a serialization
point of view when writing the same record multiple times,
each in a different tuple.

For example, we started with this:

.collect(new Tuplemailto:gaurav671...@gmail.com]
*Sent:* Saturday, April 29, 2017 4:32 AM
*To:* user@flink.apache.org 
*Subject:* Collector.collect

Hello

I am working on RichProcessFunction and I want to emit
multiple records at a time. To achieve this, I am currently
doing :

while(condition)

{

 Collector.collect(new Tuple<>...);

}

I was wondering, is this the correct way or there is any other
alternative.





Re: Queryable State

2017-05-02 Thread Chet Masterson
Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks!  02.05.2017, 06:03, "Ufuk Celebi" :Hey Chet! I'm wondering why you are only seeing 2 registrationmessages for 3 task managers. Unfortunately, there is no log messageat the task managers when they send out the notification. Is itpossible for you to run a remote debugger with the task managers orbuild a custom Flink version with the appropriate log messages on thetask manager side?– UfukOn Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson wrote:  Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state... 26.04.2017, 12:11, "Chet Masterson" : After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" : Thanks! Your config looks good to me. Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG? log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG Then we can check whether the JobManager logs the registration of the state instance with the respective name in the case of parallelism > 1? Expected output is something like this: "Key value state registered for job ${msg.getJobId} under name ${msg.getRegistrationName}." – Ufuk On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson  wrote:  Ok...more information.  1. Built a fresh cluster from the ground up. Started testing queryable state  at each step.  2. When running under any configuration of task managers and job managers  were parallelism = 1, the queries execute as expected.  3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job  manager) feeding off a kafka topic partitioned three ways, queries will  always fail, returning error  (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an  error message of null.  4. I do know my state is as expected on the cluster. Liberal use of trace  prints show my state managed on the jobs is as I expect. However, I cannot  query them external.  5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed  is configured by using the job manager UI.  6. My flink-conf.yaml:  jobmanager.rpc.address: flink01  jobmanager.rpc.port: 6123  jobmanager.heap.mb: 256  taskmanager.heap.mb: 512  taskmanager.data.port: 6121  taskmanager.numberOfTaskSlots: 1  taskmanager.memory.preallocate: false  parallelism.default: 1  blob.server.port: 6130  jobmanager.web.port: 8081  query.server.enable: true  7. I do know my job is indeed running in parallel, from trace prints going  to the task manager logs.  Do I need a backend configured when running in parallel for the queryable  state? Do I need a shared temp directory on the task managers?  THANKS!  25.04.2017, 04:24, "Ufuk Celebi" :  It's strange that the rpc port is set to 3 when you use a  standalone cluster and configure 6123 as the port. I'm pretty sure  that the config has not been updated.  But everything should work as you say when you point it to the correct  jobmanager address and port. Could you please post the complete  stacktrace you get instead of the message you log?  On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson   wrote:   More information:   0. I did remove the query.server.port and query.server.enabled from all   flink-conf.yaml files, and restarted the cluster.   1. The Akka error doe

Re: RocksDB error with flink 1.2.0

2017-05-02 Thread Elias Levy
Any reason they can't share a single RocksDB state backend instance?


On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek 
wrote:

> The problem here is that this will try to open 300 RocksDB instances on
> each of the TMs (depending on how the parallelism is spread between the
> machines this could be more or less). As the exception says, this will open
> too many files because each RocksDB instance has a directory with several
> files in it.
>
> One possible solution would be to increase the limit on open files but I
> don’t think that opening 300 RocksDB instances on one machine is a good
> idea for any size of machine. I think with this many patterns you could
> start thinking about writing the pattern matching yourself and multiplexing
> the several patterns in one stateful function or operator.
>
> @Stefan, what do you think about having this many Rocks instances?
>


Re: RocksDB error with flink 1.2.0

2017-05-02 Thread Aljoscha Krettek
They can’t (with the current design of Flink) because each CEP pattern get’s 
executed by a separate operator.

We could think about doing multiplexing of several patterns inside one 
operator. It’s what I hinted at earlier as a possible solution when I mentioned 
that you could implement your own operator that keeps track of the patterns and 
does the pattern matching.

Best,
Aljoscha
> On 2. May 2017, at 18:00, Elias Levy  wrote:
> 
> Any reason they can't share a single RocksDB state backend instance?
> 
> 
> On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek  > wrote:
> The problem here is that this will try to open 300 RocksDB instances on each 
> of the TMs (depending on how the parallelism is spread between the machines 
> this could be more or less). As the exception says, this will open too many 
> files because each RocksDB instance has a directory with several files in it.
> 
> One possible solution would be to increase the limit on open files but I 
> don’t think that opening 300 RocksDB instances on one machine is a good idea 
> for any size of machine. I think with this many patterns you could start 
> thinking about writing the pattern matching yourself and multiplexing the 
> several patterns in one stateful function or operator.
> 
> @Stefan, what do you think about having this many Rocks instances?
> 
> 



Join two kafka topics

2017-05-02 Thread Tarek khal
I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream. 


Here the result that I expect, but without restarting the Job, here I
restarted the Job to get this result: 


 

Code:





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-tp12954.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.