Re: Apache Flink Operator State as Query Cache

2015-11-15 Thread Welly Tambunan
Hi Kostas,

Yes. Exactly. Thanks a lot for this one.

That's really what we need !


Cheers

On Sun, Nov 15, 2015 at 8:53 PM, Kostas Tzoumas  wrote:

> Hi Wally,
>
> This version adds support for specifying and switching between time
> semantics - processing time, ingestion time, or event time.
>
> When working with event time, you can specify watermarks to track the
> progress of event time. So, even if events arrive out of order, windows
> will be specified on the event time (not arrival time), and the computation
> will be triggered on watermark arrival.
>
> You can see the API reference and an example here:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#working-with-time
>
> Is this what you are looking for?
>
> Kostas
>
>
> On Sat, Nov 14, 2015 at 1:54 AM, Welly Tambunan  wrote:
>
>> Hi Robert,
>>
>> Is this version has already handle the stream perfection or out of order
>> event ?
>>
>> Any resource on how this work and the API reference ?
>>
>>
>> Cheers
>>
>> On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan 
>> wrote:
>>
>>> Awesome !
>>>
>>> This is really the best weekend gift ever. :)
>>>
>>> Cheers
>>>
>>> On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger 
>>> wrote:
>>>
 Hi Welly,
 Flink 0.10.0 is out, its just not announced yet.
 Its available on maven central and the global mirrors are currently
 syncing it. This mirror for example has the update already:
 http://apache.mirror.digionline.de/flink/flink-0.10.0/

 On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan 
 wrote:

> Hi Aljoscha,
>
> Thanks for this one. Looking forward for 0.10 release version.
>
> Cheers
>
> On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek  > wrote:
>
>> Hi,
>> I don’t know yet when the operator state will be transitioned to
>> managed memory but it could happen for 1.0 (which will come after 0.10).
>> The good thing is that the interfaces won’t change, so state can be used 
>> as
>> it is now.
>>
>> For 0.10, the release vote is winding down right now, so you can
>> expect the release to happen today or tomorrow. I think the streaming is
>> production ready now, we expect to mostly to hardening and some
>> infrastructure changes (for example annotations that specify API 
>> stability)
>> for the 1.0 release.
>>
>> Let us know if you need more information.
>>
>> Cheers,
>> Aljoscha
>> > On 12 Nov 2015, at 02:42, Welly Tambunan  wrote:
>> >
>> > Hi Stephan,
>> >
>> > >Storing the model in OperatorState is a good idea, if you can. On
>> the roadmap is to migrate the operator state to managed memory as well, 
>> so
>> that should take care of the GC issues.
>> > Is this using off the heap memory ? Which version we expect this
>> one to be available ?
>> >
>> > Another question is when will the release version of 0.10 will be
>> out ? We would love to upgrade to that one when it's available. That
>> version will be a production ready streaming right ?
>> >
>> >
>> >
>> >
>> >
>> > On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen 
>> wrote:
>> > Hi!
>> >
>> > In general, if you can keep state in Flink, you get better
>> throughput/latency/consistency and have one less system to worry about
>> (external k/v store). State outside means that the Flink processes can be
>> slimmer and need fewer resources and as such recover a bit faster. There
>> are use cases for that as well.
>> >
>> > Storing the model in OperatorState is a good idea, if you can. On
>> the roadmap is to migrate the operator state to managed memory as well, 
>> so
>> that should take care of the GC issues.
>> >
>> > We are just adding functionality to make the Key/Value operator
>> state usable in CoMap/CoFlatMap as well (currently it only works in 
>> windows
>> and in Map/FlatMap/Filter functions over the KeyedStream).
>> > Until the, you should be able to use a simple Java HashMap and use
>> the "Checkpointed" interface to get it persistent.
>> >
>> > Greetings,
>> > Stephan
>> >
>> >
>> > On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan 
>> wrote:
>> > Thanks for the answer.
>> >
>> > Currently the approach that i'm using right now is creating a
>> base/marker interface to stream different type of message to the same
>> operator. Not sure about the performance hit about this compare to the
>> CoFlatMap function.
>> >
>> > Basically this one is providing query cache, so i'm thinking
>> instead of using in memory cache like redis, ignite etc, i can just use
>> operator state for 

Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-15 Thread Konstantin Knauf
Hi everyone,

I have the following issue with Flink (0.10) and Kafka.

I am using a very simple TimestampExtractor like [1], which just
extracts a millis timestamp from a POJO. In my streaming job, I read in
these POJOs from Kafka using the FlinkKafkaConsumer082 like this:

stream = env.addSource(new FlinkKafkaConsumer082<
(parameterTool.getRequired("topic"),
new AvroPojoDeserializationSchema(),
parameterTool.getProperties()))

I have timestampEnabled() and the TimeCharacteristics are EventTime,
AutoWatermarkIntervall is 500.

The problem is, when I do something like:

stream.assignTimestamps(new PojoTimestampExtractor(6000))
.timeWindowAll(Time.of(1, TimeUnit.SECONDS)
.sum(..)
.print()

env.execute();

the windows never get triggered.

If I use ProcessingTime it works.
If I use env.fromCollection(...) instead of the KafkaSource it works
with EventTime, too.

Any ideas what I could be doing wrong are highly appreciated.

Cheers,

Konstantin

[1]:

public class PojoTimestampExtractor implements TimestampExtractor {

final private long maxDelay;

public  PojoTimestampExtractor(long maxDelay) {
this.maxDelay = maxDelay;
}

@Override
public long extractTimestamp(Pojo fightEvent, long l) {
return pojo.getTime();
}

@Override
public long extractWatermark(Pojo pojo, long l) {
return pojo.getTime() - maxDelay;
}

@Override
public long getCurrentWatermark() {
return Long.MIN_VALUE;
}


Different CoGroup behavior inside DeltaIteration

2015-11-15 Thread Truong Duc Kien

Hi,

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:


If a DataSet has a group with no matching key in the other DataSet,
the CoGroupFunction is called with an empty group for the non-existing
group.


The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

  def coGroupFuntion(first: Iterator[(Int, Int)],
 second: Iterator[(Int, Int)],
 out: Collector[(Int, Int)]): Unit = {
if (second.hasNext) {
  out.collect(second.next)
} else {
  printf("Not in second set: %s\n", first.next)
  println("These two lines doesn't appear when " +
"running cogroup on solution set")
}
  }

  def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging()

val d1 = env.fromElements(
  new Tuple2(1, 1),
  new Tuple2(2, 1) ,
  new Tuple2(3, 1)
)

d1.iterateDelta(d1, 1, Array{0}) {
  (solutionSet, workSet) => {
val f = workSet.filter(_._1 != 1)
println("Cogroup on solution set with delta iteration")
val newSolutionSet = solutionSet.coGroup(f)
  .where(0)
  .equalTo(0)
  .apply(coGroupFuntion _)
(newSolutionSet, newSolutionSet)
  }
}.print()

println("Normal cogroup")
val d2 = d1.filter(_._1 != 1)
d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
  }
}



Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong