RE: Lower Parallelism derives better latency

2018-01-03 Thread Netzer, Liron
Hi Stefan,
Thanks for replying.

All of the tests below were executed with a buffer timeout of zero:

 env.setBufferTimeout(0);
so this means that the buffers were flushed after each record.

Any other explanation? :)

Thanks,
Liron


From: Stefan Richter [mailto:s.rich...@data-artisans.com]
Sent: Wednesday, January 03, 2018 3:20 PM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

one possible explanation that I see is the following: in a shuffle, each there 
are input and output buffers for each parallel subtask to which data could be 
shuffled. Those buffers are flushed either when full or after a timeout 
interval. If you increase the parallelism, there are more buffers and each 
buffer gets a smaller fraction of the data. This, in turn, means that it takes 
longer until an individual buffer is full and data is emitted. The timeout 
interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this 
would increase latency without bounds - at least once you hit the buffer 
timeout interval the latency should no longer increase. You could validate this 
by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan


Am 03.01.2018 um 08:13 schrieb Netzer, Liron 
>:

Hi group,

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs 
and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator 
has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms


This was a surprise for us, as we expected that higher parallelism will derive 
better latency.
Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably 
more serialization/deserialization, but this can't be the only reason for this 
behavior.

We have two Kafka sources, and the rest of the operators are fixed windows, 
flatmaps, coMappers and several KeyBys.
Except for the Kafka sources and some internal logging, there is no other I/O 
(i.e. we do not connect to any external DB/queue)
We use Flink 1.3.


Thanks,
Liron



RE: Lower Parallelism derives better latency

2018-01-03 Thread Netzer, Liron
Hi Aljoscha,
The latency is measured with Flink MetricGroup (specifically with 
"DropwizardHistogram").
The latency is measured from message read time (i.e. from when the message is 
pulled from Kafka source) until the last operator completes the 
processing(there is no Kafka sink).

Thanks,
Liron

From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Wednesday, January 03, 2018 3:03 PM
To: Netzer, Liron [ICG-IT]
Cc: user@flink.apache.org
Subject: Re: Lower Parallelism derives better latency

Hi,

How are you measuring latency? Is it latency within a Flink Job or from Kafka 
to Kafka? The first seems more likely but I'm still interested in the details.

Best,
Aljoscha


On 3. Jan 2018, at 08:13, Netzer, Liron 
> wrote:

Hi group,

We have a standalone Flink cluster that is running on a UNIX host with 40 CPUs 
and 256GB RAM.
There is one task manager and 24 slots were defined.
When we decrease the parallelism of the Stream graph operators(each operator 
has the same parallelism),
we see a consistent change in the latency, it gets better:
Test run

Parallelism

99 percentile

95 percentile

75 percentile

Mean

#1

8

4.15 ms

2.02 ms

0.22 ms

0.42 ms

#2

7

3.6 ms

1.68 ms

0.14 ms

0.34 ms

#3

6

3 ms

1.4 ms

0.13 ms

0.3 ms

#4

5

2.1 ms

0.95 ms

0.1 ms

0.22 ms

#5

4

1.5 ms

0.64 ms

0.09 ms

0.16 ms


This was a surprise for us, as we expected that higher parallelism will derive 
better latency.
Could you try to assist us to understand this behavior?
I know that when there are more threads that are involved, there is probably 
more serialization/deserialization, but this can't be the only reason for this 
behavior.

We have two Kafka sources, and the rest of the operators are fixed windows, 
flatmaps, coMappers and several KeyBys.
Except for the Kafka sources and some internal logging, there is no other I/O 
(i.e. we do not connect to any external DB/queue)
We use Flink 1.3.


Thanks,
Liron



Exception on running an Elasticpipe flink connector

2018-01-03 Thread vipul singh
Hello,

We are working on a Flink ES connector, sourcing from a kafka stream, and
sinking data into elasticsearch. The code works fine in intellij, but while
running the code on emr(version 5.9, which uses flink 1.3.2) using
flink-yarn-session, we are seeing this exception

Using the parallelism provided by the remote cluster (1). To use
another parallelism, set it at the ./bin/flink client.

Starting execution of program

2018-01-02 23:19:16,217 INFO  org.apache.flink.yarn.YarnClusterClient
 - Starting program in interactive mode



 The program finished with the following exception:

java.lang.NoSuchMethodError:
io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;

at 
org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)

at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)

at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)

at 
org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)

at 
org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)

at 
org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)

at 
org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)

at 
org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)

at 
org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)

at 
org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)

at 
org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)

at 
org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)

On searching online, it seems like this maybe due to netty version
conflicts.
However when we ran a dependency tree on our pom, and we dont see netty
coming from anywhere else but flink:
https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06

Could you please suggest how can we resolve this error,

Thanks,
Vipul


Can't call getProducedType on Avro messages with array types

2018-01-03 Thread Kyle Hamlin
Hi,

It appears that Kryo can't properly extract/deserialize Avro array types. I
have a very simple Avro schema that has an array type and when I remove the
array field the error is not thrown. Is there any way around this without
using a specific type?

*Avro Schema:*
{
"type": "record",
"name": "Recieved",
"fields": [
{"name": "id", "type": "int"},
{"name": "time", "type": "long"},
{"name": "urls", "type": {"type": "array", "items": "string"}},
]
}

*Deserializer:*

import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig,
KafkaAvroDeserializer}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AvroDeserializer[T <: GenericRecord :
ClassTag](schemaRegistryUrl: String) extends
KeyedDeserializationSchema[T] {

  @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl).asJava,
  true)
deserializer
  }

  // Flink needs the serializer to be serializable => this "@transient
lazy val" does the trick
  @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
val deserializer = new KafkaAvroDeserializer()
deserializer.configure(
  // other schema-registry configuration parameters can be passed,
see the configure() code
  // for details (among other things, schema cache size)
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl).asJava,
  false)
deserializer
  }

  override def deserialize(messageKey: Array[Byte], message: Array[Byte],
   topic: String, partition: Int, offset: Long): T = {
valueDeserializer.deserialize(topic, message).asInstanceOf[T]
  }

  override def isEndOfStream(nextElement: T): Boolean = false

  override def getProducedType: TypeInformation[T] = {

TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
  }

}

*Stacktrace:*
Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for
job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id ----.
01/03/2018 15:19:57 Job execution switched to status RUNNING.
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to
SCHEDULED
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to
DEPLOYING
01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
at

BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-03 Thread Kyle Hamlin
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find
anything online that addresses it. Is it a Hadoop dependency issue? Here
are my project dependencies:

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
  "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
  "org.apache.flink" % "flink-metrics-core" % flinkVersion,
  "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
  "org.apache.kafka" %% "kafka" % "0.10.0.1",
  "org.apache.avro" % "avro" % "1.7.7",
  "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
  "org.apache.parquet" % "parquet-avro" % "1.8.1",
  "io.confluent" % "kafka-avro-serializer" % "3.2.0",
  "org.apache.hadoop" % "hadoop-common" % "3.0.0"
)

*Stacktrace:*
Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id ----.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:619)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more


Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the
same issue.


@Aljoscha, I have to cover the case where B can come after A from Kafka.
How I can achieve this as Event Time is not working. How should I implement
this?

 A followedBy B.

As I am using kafka source and my event API's using load balancers so
sometimes B comes before A. So my CEP doesn't generate any result for those
events.

I am trying to use Event time like this. Am I am doing anything wrong?


 kafkaSource.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) {
  override def extractTimestamp(event: Event): Long = {
try {
  val originTime = event.origTimestamp.getOrElse("0").toLong
  if(originTime <= 0)
{
  val serverTime =
event.serverTimestamp.getOrElse("0").toLong
  if(serverTime <= 0)
{
  System.currentTimeMillis()
}
  else
{
  serverTime
}
}
  else {
originTime
  }
}
catch {
  case e: Exception => Log.error("OriginTimestamp Exception
occured, "error", e.printStackTrace);
System.currentTimeMillis()
}
  }
}
  )
‌

On Wed, Jan 3, 2018 at 9:42 PM, Dawid Wysakowicz  wrote:

> Hi shashank,
>
> What version of flink are you using? Is it possible that you are hitting
> this issue: https://issues.apache.org/jira/browse/FLINK-7563 ?
>
> Watermark semantics in CEP was buggy and events were processed only if its
> timestamp was lower than current watermark while it should be lower or
> equal.
>
> Best
> Dawid
>
> > On 3 Jan 2018, at 17:05, shashank agarwal  wrote:
> >
> > ssed A with origTimestamp Y. (
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: Job Manager not able to fetch job info when restarted

2018-01-03 Thread Sushil Ks
Hi Stefan,
 It was just ran without HA, via yarn. Will try running a yarn
session with HA and test, thanks for you time. Also is there a way we can
get alerts if a pipeline restarts or gets cancelled in Flink?

Regards,
Sushil Ks

On Jan 3, 2018 7:07 PM, "Stefan Richter" 
wrote:

> Hi,
>
> did you configure high availability with Zookeeper, as described here:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/jobmanager_high_availability.html ? This should bring
> the desired behaviour.
>
> Best,
> Stefan
>
> Am 03.01.2018 um 14:04 schrieb Sushil Ks :
>
> Hi,
>   We are launching Flink on Yarn job, here whenever the job manager
> gets restarted, the previous task that was running doesn't get restarted
> instead it shows no jobs running. Is there a way to resume previous running
> task, when job manager restarts.
>
> Regards,
> Sushil Ks
>
>
>


Re: Apache Flink - Connected Stream with different number of partitions

2018-01-03 Thread M Singh
Thanks Aljoscha and Timo for your answers.  I will try to digest the pointers 
you provided.
Mans 

On Wednesday, January 3, 2018 3:01 AM, Aljoscha Krettek 
 wrote:
 

 Hi,
The answer is correct but I'll try and elaborate a bit: the way data is sent to 
downstream operations depends on a couple of things in this case:
 - parallelism of first input operation - parallelism of second input operation 
- parallelism of co-operation - transmission pattern on first input (broadcast, 
rebalance, etc.) - transmission pattern on second input
Note that there is no parallelism on "streams" since there are technically no 
streams but only operations that are interconnected in a certain way.
Now, if the input parallelism and the operation parallelism are the same and 
you don't specify a transmission pattern then data will not be "shuffled" 
between the operations. If you specify broadcast or rebalance then you will get 
that, i.e. for broadcast an element from the input operator will be sent to 
every instance on the downstream operation.
Best,Aljoscha

On 3. Jan 2018, at 10:43, Timo Walther  wrote:
 
 Hi Mans,
 
 I did a quick test on my PC where I simply set breakpoints in map1 and map2 
(someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of 
someStream end up in different CoMapTasks (2/8, 7/8 etc.).
 
 So I guess the distribution is a round robin partioning. @Aljoscha might know 
more about the internals?
 
 Regards,
 Timo
 
 
 
 Am 12/31/17 um 10:38 PM schrieb M Singh:
  
  Hi: 
  Referring to 
documentation(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html)
 for ConnectedStreams: 
  "Connects" two data streams retaining their types. Connect allowing for 
shared state between the two streams.  DataStream someStream = //...
DataStream otherStream = //...

ConnectedStreams connectedStreams = 
someStream.connect(otherStream);  
  If the two connected streams have different number of partitions, eg 
(someStream has 4 and otherStream has 2), then how do the elements of the 
stream get distributed for the CoMapFunction: 
   connectedStreams.map(new CoMapFunction() {
@Override
public Boolean map1(Integer value) {
return true;
}

@Override
public Boolean map2(String value) {
return false;
}
});  
  I believe that that if the second stream is broadcast, then each partition of 
the first will get all the elements of the second.  Is my understanding correct 
? 
  If the streams are not broadcast and since the first stream has 4 partitions 
and second one had 2, then how are the elements of the second stream 
distributed to each partition of the first ? 
  Also, if the streams are not broadcasted but have same number of partitions, 
how are the elements distributed ? 
  Thanks 
  Mans 
  
  
  
   

  


   

Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-03 Thread M Singh
Hi Stefan:
Thanks for your response.
A follow up question - In a streaming environment, we invoke the operation 
reduce and then output results to the sink. Does this mean reduce will be 
executed once on every trigger per partition with all the items in each 
partition ?

Thanks 

On Wednesday, January 3, 2018 2:46 AM, Stefan Richter 
 wrote:
 

 Hi,
I would interpret this as: the reduce produces an output for every new reduce 
call, emitting the updated value. There is no need for a window because it 
kicks in on every single invocation.
Best,Stefan


Am 31.12.2017 um 22:28 schrieb M Singh :
Hi:
Apache Flink documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html)
 indicates that a reduce function on a KeyedStream  as follows:
A "rolling" reduce on a keyed data stream. Combines the current element with 
the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:keyedStream.reduce(new 
ReduceFunction() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
The KeyedStream is not windowed, so when does the reduce function kick in to 
produce the DataStream (ie, is there a default time out, or collection size 
that triggers it, since we have not defined any window on it).
Thanks
Mans



   

Re: Pending parquet file with Bucking Sink

2018-01-03 Thread Aljoscha Krettek
Hi,

Your analysis is correct. If the program ends before we can do a checkpoint 
files will never be moved to "final" state. We could move all files to "final" 
stage when the Sink is closing but the problem here is that Flink currently 
doesn't provide a way for user functions (which Sinks are) to distinguish 
between "erroneous close" and "close because of stream end" so we cannot 
currently do this. We are aware of the problem and this is the Jira Issue for 
tracking it: https://issues.apache.org/jira/browse/FLINK-2646 


Best,
Aljoscha 

> On 20. Dec 2017, at 19:05, xiatao123  wrote:
> 
> Hi Vipul,
>  Thanks for the information.  Yes, I do have checkpointing enabled with 10
> millisecs.
>  I think the issue here is that the stream ended before the checkpoint
> reached.  This is a testing code that the DataStream only have 5 events then
> it ended. Once the stream ended, the checkpoint is not triggered, then the
> file remains in "pending" state.
>  Anyway we can force a checkpoint trigger? or let the sink know the stream
> ended? 
> Thanks,
> Tao
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: scala 2.12 support/cross-compile

2018-01-03 Thread Hao Sun
Thanks Stephan and Alhoscha for the info!

On Wed, Jan 3, 2018 at 2:41 AM Aljoscha Krettek  wrote:

> Hi,
>
> This is the umbrella issue for Scala 2.12 support. As Stephan pointed out,
> the ClosureCleaner and SAMs are currently the main problems. The first is
> also a problem for Spark, which track their respective progress here:
> https://issues.apache.org/jira/browse/SPARK-14540
> .
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 10:39, Stephan Ewen  wrote:
>
> Hi Hao Sun!
>
> This is work in progress, but Scala 2.12 is a bit tricky. I think the
> Scala folks have messed this version up a bit, to be honest.
>
> The main blockers is that Scala 2.12 breaks some classes through its
> addition of SAM interface lambdas (similar to Java). Many of the DataStream
> API classes have two method variants (one with a Scala Function, one with a
> Java SAM interface) which now become ambiguously overloaded methods in
> Scala 2.12.
>
> In addition, Scala 2.12 also needs a different closure cleaner, because
> Scala 2.12 compiles differently.
>
> I am adding Aljoscha, who has started working on this...
>
> Best,
> Stephan
>
>
> On Wed, Jan 3, 2018 at 4:13 AM, Hao Sun  wrote:
>
>> Hi team, I am wondering if there is a schedule to support scala 2.12?
>> If I need flink 1.3+ with scala 2.12, do I just have to cross compile
>> myself? Is there anything blocking us from using scala 2.12?
>>
>> Thanks
>>
>
>
>


Re: state.checkpoints.dir not configured

2018-01-03 Thread Aljoscha Krettek
Hi,

I think what might have happened is that you had an earlier JobManager still 
running that had the old configuration loaded. Then you tried starting a new 
JobManager. Could that be the case?

Best,
Aljoscha

> On 21. Dec 2017, at 16:34, Plamen Paskov  
> wrote:
> 
> I'm sorry but i already cleaned up the logs. If i encounter the same error 
> again i will let you know
> 
> 
> On 21.12.2017 17:12, Ufuk Celebi wrote:
>> Could you please share the complete logs of the initial failure? What
>> you describe in your second email should not happen ;-) If the
>> JobManager cannot bind to the port it should simply die and not
>> complain about checkpoint configuration.
>> 
>> – Ufuk
>> 
>> On Thu, Dec 21, 2017 at 1:21 PM, Plamen Paskov
>>  wrote:
>>> I inspected the log as you suggest and found that 6123 port was used by
>>> another process. I free the port and restarted the job manager. Now
>>> everything looks fine. The error message is little misleading as the real
>>> cause is that 6123 is already bind but it says that state.checkpoints.dir is
>>> not set.
>>> 
>>> Thanks
>>> 
>>> 
>>> 
>>> On 19.12.2017 17:55, Ufuk Celebi wrote:
 When the JobManager/TaskManager are starting up they log what config
 they are loading. Look for lines like
 
 "Loading configuration property: {}, {}"
 
 Do you find the required configuration as part of these messages?
 
 – Ufuk
 
 
 On Tue, Dec 19, 2017 at 3:45 PM, Plamen Paskov
  wrote:
> Hi,
> I'm trying to enable externalized checkpoints like this:
> 
> env.enableCheckpointing(1000);
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> 
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new
> FsStateBackend("file:///tmp/flink-checkpoints-data/", true));
> 
> in flink-conf.yaml i set:
> state.checkpoints.dir: file:///tmp/flink-checkpoints-meta/
> 
> but when i run the application i get this error:
> java.lang.IllegalStateException: CheckpointConfig says to persist
> periodic
> checkpoints, but no checkpoint directory has been configured. You can
> configure configure one via key 'state.checkpoints.dir'.
> 
> Any suggestions?
> 
> Thanks
>>> 
> 



Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Kyle Hamlin
Hello Stephan & Nico,

Here is the full stacktrace, its not much more than what I originally
posted. I remember seeing an XMLInputFactory input error at one point, but
I haven't seen that again. Is there any other information I can provide
that will help resolve?

[flink-1.4.0] ./bin/flink run ~/streaming.jar
Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: 022e4a310cd56ba4f7befc0286921663. Waiting for
job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id ----.
01/03/2018 11:21:53 Job execution switched to status RUNNING.
01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to
SCHEDULED
01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to
DEPLOYING
01/03/2018 11:21:53 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
01/03/2018 11:21:54 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
at
org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

01/03/2018 11:21:54 Job execution switched to status FAILING.
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
at
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
at
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
at
org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
01/03/2018 11:21:54 Job execution switched to status RESTARTING.

On Wed, Jan 3, 2018 at 11:33 AM Stephan Ewen  wrote:

> The error is not a the missing Class "S3ErrorResponseHandler", but the
> initialization of that class.
>
> It could be missing classes that are statically referenced by the
> 

Re: Flink CEP with event time

2018-01-03 Thread Dawid Wysakowicz
Hi shashank,

What version of flink are you using? Is it possible that you are hitting this 
issue: https://issues.apache.org/jira/browse/FLINK-7563 ?

Watermark semantics in CEP was buggy and events were processed only if its 
timestamp was lower than current watermark while it should be lower or equal.

Best
Dawid

> On 3 Jan 2018, at 17:05, shashank agarwal  wrote:
> 
> ssed A with origTimestamp Y. (



signature.asc
Description: Message signed with OpenPGP


Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
What are the actual timestamps? If your BoundedOutOfOrderness extractor is 
lagging by 10 seconds then only seeing Event 1.B would not trigger execution. 
Only the later Event 2.A is sufficiently far ahead to trigger execution, which 
you actually get.

> On 3. Jan 2018, at 17:05, shashank agarwal  wrote:
> 
> Low Watermark is showing the same value which I am passing in event 
> "1514994744412" for all the tasks related to that stream, (No watermark) is 
> showing for Kafka source in UI.
> 
> So the pattern is following for CEP A followedBy B :
> 
> Event 1
> - I passed A with origTimestamp X. (Low watermark updated to X)  : No results 
>  (this is right )
> - I passed B with origTimestamp X1. (Low watermark updated to X1)  : No 
> results (results should be printed)
> 
> Event 2
> - I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results of 
> Event 1 printed  (this is wrong )
> - I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No 
> results (results should be printed)
> 
> Event 3
> - I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results of 
> Event 2 printed  (this is wrong )
> - I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No 
> results (results should be printed)
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek  > wrote:
> Can you please check what the input watermark of your operations is? There is 
> a metric called "currentLowWatermark" for this.
> 
> Best,
> Aljoscha
> 
>> On 3. Jan 2018, at 15:54, shashank agarwal > > wrote:
>> 
>> Actually, In Kafka there are other topics also (around 5-6 topics) I am 
>> consuming particular topic 'x' which only contains events.  Other topics 
>> have different data. 
>> 
>> I am using two consumers in my program for 2 different topics. in first 
>> topic x i am extracting the timestamp from origintimestamp variable in other 
>> one i am using system current millis.
>> 
>> 
>> 
>> 
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek > > wrote:
>> Ok, but will there be events in all Kafka partitions/topics?
>> 
>> 
>>> On 3. Jan 2018, at 15:33, shashank agarwal >> > wrote:
>>> 
>>> Hi,
>>> 
>>> Yes, Events will always carry a variable OriginTimestamp which I am using 
>>> in the extractor. I have used fallback also in case of data missing will 
>>> put System current millis. 
>>> 
>>> Still, it's not printing results.
>>> 
>>> Best,
>>> Shashank
>>> 
>>> 
>>> 
>>> 
>>> 
>>> ‌
>>> 
>>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek >> > wrote:
>>> Hi,
>>> 
>>> Are all the partitions always carrying data that has advancing timestamps? 
>>> When using Event-time the Kafka source (and Flink in general) needs to have 
>>> steady progress in all partitions, otherwise the watermark does not 
>>> advance, which in turn means that processing will be stalled downstream.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> 
 On 3. Jan 2018, at 14:29, shashank agarwal > wrote:
 
 Hello,
 
 I have some patterns in my program. For an example,
 
  A followedBy B.
 
 As I am using kafka source and my event API's using load balancers so 
 sometimes B comes before A. So my CEP doesn't generate any result for 
 those events. 
 
 I have then tried event time and applied 
 "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time 
 from an origin time variable which I have in the event. I am using 
 watermark lateness of 10 seconds in that.
 
 Now CEP stopped generating results. It's not even generating results where 
 Event B comes after A. I have tried within (10 seconds) in CEP also still 
 not generating results. 
 
 Am I doing anything wrong?
 
 I have to cover the case where B can come after A from Kafka.
 
 -- 
 Thanks Regards
 
 SHASHANK AGARWAL
  ---  Trying to mobilize the things
 
 
 
 
 ‌
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Thanks Regards
>>> 
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
>> 
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things



Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Low Watermark is showing the same value which I am passing in event
"1514994744412" for all the tasks related to that stream, (No watermark) is
showing for Kafka source in UI.

So the pattern is following for CEP A followedBy B :

Event 1
- I passed A with origTimestamp X. (Low watermark updated to X)  : No
results  (this is right )
- I passed B with origTimestamp X1. (Low watermark updated to X1)  : No
results (results should be printed)

Event 2
- I passed A with origTimestamp Y. (Low watermark updated to Y)  : Results
of Event 1 printed  (this is wrong )
- I passed B with origTimestamp Y1. (Low watermark updated to Y1)  : No
results (results should be printed)

Event 3
- I passed A with origTimestamp Z. (Low watermark updated to Z)  : Results
of Event 2 printed  (this is wrong )
- I passed B with origTimestamp Z1. (Low watermark updated to Z1)  : No
results (results should be printed)



‌

On Wed, Jan 3, 2018 at 8:30 PM, Aljoscha Krettek 
wrote:

> Can you please check what the input watermark of your operations is? There
> is a metric called "currentLowWatermark" for this.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 15:54, shashank agarwal  wrote:
>
> Actually, In Kafka there are other topics also (around 5-6 topics) I am
> consuming particular topic 'x' which only contains events.  Other topics
> have different data.
>
> I am using two consumers in my program for 2 different topics. in first
> topic x i am extracting the timestamp from origintimestamp variable in
> other one i am using system current millis.
>
>
>
>
> ‌
>
> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek 
> wrote:
>
>> Ok, but will there be events in all Kafka partitions/topics?
>>
>>
>> On 3. Jan 2018, at 15:33, shashank agarwal  wrote:
>>
>> Hi,
>>
>> Yes, Events will always carry a variable OriginTimestamp which I am using
>> in the extractor. I have used fallback also in case of data missing will
>> put System current millis.
>>
>> Still, it's not printing results.
>>
>> Best,
>> Shashank
>>
>>
>>
>>
>>
>> ‌
>>
>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> Are all the partitions always carrying data that has advancing
>>> timestamps? When using Event-time the Kafka source (and Flink in general)
>>> needs to have steady progress in all partitions, otherwise the watermark
>>> does not advance, which in turn means that processing will be stalled
>>> downstream.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 3. Jan 2018, at 14:29, shashank agarwal 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I have some patterns in my program. For an example,
>>>
>>>  A followedBy B.
>>>
>>> As I am using kafka source and my event API's using load balancers so
>>> sometimes B comes before A. So my CEP doesn't generate any result for those
>>> events.
>>>
>>> I have then tried event time and applied 
>>> "BoundedOutOfOrdernessTimestampExtractor"
>>> on kafkasource with extract time from an origin time variable which I have
>>> in the event. I am using watermark lateness of 10 seconds in that.
>>>
>>> Now CEP stopped generating results. It's not even generating results
>>> where Event B comes after A. I have tried within (10 seconds) in CEP also
>>> still not generating results.
>>>
>>> Am I doing anything wrong?
>>>
>>> I have to cover the case where B can come after A from Kafka.
>>>
>>> --
>>> Thanks Regards
>>>
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things
>>>
>>>
>>>
>>>
>>> ‌
>>>
>>>
>>>
>>
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
>>
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: periodic trigger

2018-01-03 Thread Piotr Nowojski
Hi,

Sorry for late response (because of the holiday period).

You didn’t mention lateness previously, that’s why I proposed such solution. 

Another approach would be to calculate max session length per user on the first 
aggregation level and at the same time remember what was the previously 
emitted/triggered value. Now, whenever you recalculate your window because of 
firing a trigger, you could check what is the new value and what was the 
previously emitted value. If they are the same, you do not have to emit 
anything. If they are different, you would have to issue an “update”/“diff” to 
the global aggregation on the second level. In case of simple average of 
session length, first level on update could emit “-old_value” and “+new_value” 
(negative old_value and positive new_value). 

To do this in Flink you could use 

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

and store the previously emitted values in

org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context#windowState()

For efficiency reasons best to combine it with a reduce function using this call

org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction,
 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction)

Reduce will ensure that your ProcessWindowFunction will not have to process all 
events belonging to the window each time it is triggered, but only it will have 
to process the single reduced element.

In your case:

.keyBy("userId", "sessionId")
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.reduce(MAX_BY_LENGTH_REDUCE_FUNCTION, 
MY_FANCY_PROCESS_WINDOW_FUNCTION_WITH_UPDATES).

Of course second level global aggregation would have to understand and take 
those “update”/“diffs” into account, but that would be simple to implement by 
some custom reduce function/aggregate. You would like the sequence of values 1, 
1, 10, -10, 1 produce average of 1 and not 0.6 - negative values have to 
decrease counters in calculating the average value (to aggregate average value 
you always need to keep sum of values and a counter).

.timeWindowAll(Time.seconds(60))
.aggregate(FANCY_AGGREGATE_THAT_HANDLES_UPDATES)
.print();

Hope that helps.

Piotrek

> On 22 Dec 2017, at 14:10, Plamen Paskov  wrote:
> 
> I think it will not solve the problem as if i set ContinuousEventTimeTrigger 
> to 10 seconds and allowedLateness(Time.seconds(60)) as i don't want to 
> discard events from different users received later then i might receive more 
> than one row for a single user based on the number of windows created by the 
> events of this user. That will make the the average computations wrong.
> 
> On 22.12.2017 12:10, Piotr Nowojski wrote:
>> Ok, I think now I understand your problem. 
>> 
>> Wouldn’t it be enough, if you change last global window to something like 
>> this:
>> 
>> lastUserSession
>> .timeWindowAll(Time.seconds(10))
>> .aggregate(new AverageSessionLengthAcrossAllUsers())
>> .print();
>> 
>> (As a side note, maybe you should use ContinousEventTimeTrigger in the first 
>> window). This way it will aggregate and calculate average session length of 
>> only last “preview results” of the 60 seconds user windows (emitted every 10 
>> seconds from the first aggregation).
>> 
>> Piotrek
>> 
>>> On 21 Dec 2017, at 15:18, Plamen Paskov >> > wrote:
>>> 
>>> Imagine a case where i want to run a computation every X seconds for 1 day 
>>> window. I want the calculate average session length for current day every X 
>>> seconds. Is there an easy way to achieve that?
>>> 
>>> On 21.12.2017 16:06, Piotr Nowojski wrote:
 Hi,
 
 You defined a tumbling window 
 (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows
  
 )
  of 60 seconds, triggered every 10 seconds. This means that each input 
 element can be processed/averaged up to 6 times (there is no other way if 
 you trigger each window multiple times).
 
 I am not sure what are you trying to achieve, but please refer to the 
 documentation about different window types (tumbling, sliding, session) 
 maybe it will clarify things for you:
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
  
 
 
 If you want to avoid duplicated processing, use either tumbling window 
 with default trigger (triggering at the end of the window), or use session 
 windows.
 
 Piotrek

Re: BackPressure handling

2018-01-03 Thread Vishal Santoshi
Absolutely.

   But without a a view into a global WM at the source level, the --> would
require sources to wait for sources that are "slower in time" -->  is not
possible for folks creating custom sources on extending existing ones. I
would have loved to use a more scientific/data drive approach to slow down
the aggressive consumers but   without an fundamental API change where
sources know the current global WM it is not possible . Further resources
are at a premium as in if I can and should be able to execute should rqs, a
replay or a real time should not show differing characteristics IMHO.

The more I work with Flink, the more I realize that the crux of Flink is WM
generation ( if we are doing Event Processing ) and I would want the
current WM at an Operator  to be exposed at all places feasible. I have
another email thread on why WM driven preemptive analysis of an ordered
list of Events within a Window is desirable but could not do it using the
natural API ( Accumulator based ) b'coz of paucity of WM view.

Anywaz it does work as on now but I know it is an interim solution.

Regards

Vishal

On Wed, Jan 3, 2018 at 9:55 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think your analysis is very thorough and accurate. However, I don't
> think that https://issues.apache.org/jira/browse/FLINK-7282 will solve
> this problem. We're dealing with "time back-pressure" here and not
> traditional processing back-pressure. (Not sure if there's a term for this
> so I coined a new one... ). The problem is that some Kafka consumers
> progress faster in event-time than others which leads to windows being
> spawned at the window operator that only trigger (and get deleted) once all
> sources catch up to that point in event time.
>
> Simply slowing down all reading mitigates this somewhat but a proper
> solution would require sources to wait for sources that are "slower in
> time".
>
> Does that make sense to you?
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 15:45, Vishal Santoshi 
> wrote:
>
> To add and an interim solution to the issue.
>
> I extended the based on the advise "custom source/adapt an existing
> source" and put in a RateLimiter ( guava ) that effectively put a cap on
> each kafka consumer  ( x times the expected incident rqs ). That solved the
> issue  as in it stabilized the flow into down stream window operation ( I
> use ProcessFunction for sessionizing and why is another discussion ) .
>
>  This leads me to these conclusions and either of them could be correct
>
>
> * The kakfa partitions are on different brokers ( the leaders ) and based
> on the how efficient the broker is ( whether it has data in OS Cache  or
> whether there is a skew in leader distribution and thus more stress on a n
> of m brokers) the consumption speed can vary.
> * The skew caused by number of consumers to number of flink nodes ( if no.
> of partitions % no of flink nodes == 0 there is no skew ) the consumption
> rate can vary.
> * Some TM nodes may be sluggish.
>
> Either ways any of the above reasons can cause data to be consumed at
> different speeds which could lead to an imbalance and b'coz of the paucity
> of fine grained back pressure handling leads to more windows that remain
> open, windows that reflect the more aggressive consumption ( and thus more
> variance from the current WM)  than prudent, causing the pathological case
> described above. By regulating the consumption rate ( I put the delimiter
> in the extractTimestamp method ) , it effectively caused the more
> aggressive consumptions to a fixed upper bound, making the rate of
> consumption across consumers effectively similar.
>
> Either ways it seems imperative that https://issues.apache.
> org/jira/browse/FLINK-7282 should be finalized at the earliest. The
> consequences on a shared flink cluster are too huge IMHO.
>
> Please tell me if my conclusions are problematic or do not make sense.
>
>
> Regards
>
> Vishal
>
>
>
>
>
>
> On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi  > wrote:
>
>> Also note that if I were to start 2 pipelines
>>
>> 1. Working off the head of the topic and thus not prone to the
>> pathological case described above
>> 2. Doing a replay and thus prone to the  pathological case described above
>>
>> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
>>
>>- All channels multiplexed into the same TCP connection stall
>>together, as soon as one channel has backpressure.
>>
>>
>> of the jira issue. This has to be a priority IMHO, in a shared VM where
>> jobs should have at least some isolation.
>>
>> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thank you.
>>>
>>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber 
>>> wrote:
>>>
 Hi Vishal,
 let me already point you towards the JIRA issue for the credit-based
 flow control: 

Fwd: Queryable State Client - Actor Not found Exception

2018-01-03 Thread Velu Mitwa
Hi,
I am running a Flink Job which uses the Queryable State feature of Apache
Flink(1.3.2). I was able to do that in local mode. When I try to do that in
Cluster mode (Yarn Session), I am getting Actor not found Exception.

Please help me to understand what is missing.

*Exception Trace*


Query failed because of the following Exception:
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
ActorSelection.scala:65)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(
BatchingExecutor.scala:73)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.
scala:120)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
Promise.scala:248)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(
AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.
scala$concurrent$Future$InternalCallbackExecutor$$
unbatchedExecute(Future.scala:694)
at scala.concurrent.Future$InternalCallbackExecutor$.
execute(Future.scala:691)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(
Scheduler.scala:474)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(
Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$
anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$
anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)

*Client Creation Snippet *

* Configuration config = new Configuration();*
*config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
jobManagerHost);*
*config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
jobManagerPort);*

*final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils*
*.createHighAvailabilityServices(config,
Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*

*this.client = new QueryableStateClient(config,
highAvailabilityServices);*
*  }*


Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Can you please check what the input watermark of your operations is? There is a 
metric called "currentLowWatermark" for this.

Best,
Aljoscha

> On 3. Jan 2018, at 15:54, shashank agarwal  wrote:
> 
> Actually, In Kafka there are other topics also (around 5-6 topics) I am 
> consuming particular topic 'x' which only contains events.  Other topics have 
> different data. 
> 
> I am using two consumers in my program for 2 different topics. in first topic 
> x i am extracting the timestamp from origintimestamp variable in other one i 
> am using system current millis.
> 
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek  > wrote:
> Ok, but will there be events in all Kafka partitions/topics?
> 
> 
>> On 3. Jan 2018, at 15:33, shashank agarwal > > wrote:
>> 
>> Hi,
>> 
>> Yes, Events will always carry a variable OriginTimestamp which I am using in 
>> the extractor. I have used fallback also in case of data missing will put 
>> System current millis. 
>> 
>> Still, it's not printing results.
>> 
>> Best,
>> Shashank
>> 
>> 
>> 
>> 
>> 
>> ‌
>> 
>> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek > > wrote:
>> Hi,
>> 
>> Are all the partitions always carrying data that has advancing timestamps? 
>> When using Event-time the Kafka source (and Flink in general) needs to have 
>> steady progress in all partitions, otherwise the watermark does not advance, 
>> which in turn means that processing will be stalled downstream.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 3. Jan 2018, at 14:29, shashank agarwal >> > wrote:
>>> 
>>> Hello,
>>> 
>>> I have some patterns in my program. For an example,
>>> 
>>>  A followedBy B.
>>> 
>>> As I am using kafka source and my event API's using load balancers so 
>>> sometimes B comes before A. So my CEP doesn't generate any result for those 
>>> events. 
>>> 
>>> I have then tried event time and applied 
>>> "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time 
>>> from an origin time variable which I have in the event. I am using 
>>> watermark lateness of 10 seconds in that.
>>> 
>>> Now CEP stopped generating results. It's not even generating results where 
>>> Event B comes after A. I have tried within (10 seconds) in CEP also still 
>>> not generating results. 
>>> 
>>> Am I doing anything wrong?
>>> 
>>> I have to cover the case where B can come after A from Kafka.
>>> 
>>> -- 
>>> Thanks Regards
>>> 
>>> SHASHANK AGARWAL
>>>  ---  Trying to mobilize the things
>>> 
>>> 
>>> 
>>> 
>>> ‌
>> 
>> 
>> 
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
> 



Re: A question about Triggers

2018-01-03 Thread Vishal Santoshi
Dear Fabian,

   I was able to create a pretty functional ProcessFunction and
here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The
way flink approaches is pretty simple. It will create a TimeWindow of size
"gap" relative to the event time, find an overlapping window ( intersection
) and create a covering window. Each such window has a "state" associated
with it, which too has to be merged when a cover window is created on
intersection of 2 or more incident windows.To be more precise if Window1
spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
a new Window is created ( t1, t4 ) and the associated states are merged.


In the current Window API the states are external and are
Accumulator based. This approach pretty much works for all cases where the
aggregation is accumulative/reduced  and does not depend on order, as in no
order list of incoming records needs to be kept and reduction is to a
single aggregated element ( think counts, min max etc). In path analysis (
and other use cases ) however this approach has drawbacks. Even though in
our accumulator we could keep an ordered list of events it becomes
unreasonable if not within bounds. An approach that does *attempt* to bind
state, is to preemptively analyze paths using the WM as the marker that
defines the *subset* of the state that is safe to analyze. So if we have n
events in the window state and m fall before WM, we can safely analyze the
m subset, emitting paths seen and reducing the cumulative state size. There
are caveats though that I will go into later.


Unfortunately the Accumulators in Flink Window runtime defaults do not have
access to the WM.


This lead to this generic approach  ( implemented and tested )


* Use a low level ProcessFunction that allows access to WM and definitely
nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to
trigger ( through Timers)  reductions in state. This is not very dissimilar
to what Flink does but we have more control over what to do and when to do
it. Essentially have exposed a lifecycle method that reacts to WM
progression.


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of
a Window, which if there is no further mutation b'coz of merge etc will
fire to reflect a Session End. The second one is  on currentWaterMark+1
that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On
Session size.


* There is a safety valve to blacklist keys when it is obvious that it is a
bot ( again


The solution will thus preemptively push out Patterns ( and correct
patterns ) while keeping the ordered state within reasonable bounds. The
incident data of course has to be analyzed . Are the paths to large etc.
But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi  wrote:

> This makes sense.  Thanks.
>
> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> all calls to onElement() or onTimer() are syncronized for any keys. Think
>> of a single thread calling these methods.
>> Event-time timers are called when a watermark passes the timer.
>> Watermarks are received as special records, so the methods are called in
>> the same order as records (actual records or watermarks) arrive at the
>> function. Only for processing-time timers, actual synchronization is
>> required.
>>
>> The NPE might be thrown because of two timers that fire one after the
>> other without a new record being processed in between the onTimer() calls.
>> In that case the state is cleared in the first call and null in the second.
>>
>> Best, Fabian
>>
>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi :
>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>> I have a few follow up questions regarding ProcessFunction. I think
>>> that the core should take care of any synchronization issues between calls
>>> to onElement and onTimer in case of a keyed stream but tests do not seem to
>>> suggest that.
>>>
>>>
>>>
>>> I have  specifically 2 questions.
>>>
>>>
>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
>>> in on a keyed stream, is there a  way that 2 or more threads can
>>> execute on the more than one element of a single key at one time ? Would I
>>> have to synchronize this construction
>>>
>>>
>>>
>>>
>>> *OUT accumulator = accumulatorState.value();if (accumulator == 
>>> null) {accumulator = acc.createNew();}*
>>>
>>>
>>>
>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>> same key ? I intend to clean up state but I see  NullPointers in
>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>> executed on 2  separate 

Re: BackPressure handling

2018-01-03 Thread Aljoscha Krettek
Hi,

I think your analysis is very thorough and accurate. However, I don't think 
that https://issues.apache.org/jira/browse/FLINK-7282 
 will solve this problem. 
We're dealing with "time back-pressure" here and not traditional processing 
back-pressure. (Not sure if there's a term for this so I coined a new one... 
). The problem is that some Kafka consumers progress faster in event-time than 
others which leads to windows being spawned at the window operator that only 
trigger (and get deleted) once all sources catch up to that point in event time.

Simply slowing down all reading mitigates this somewhat but a proper solution 
would require sources to wait for sources that are "slower in time".

Does that make sense to you?

Best,
Aljoscha

> On 3. Jan 2018, at 15:45, Vishal Santoshi  wrote:
> 
> To add and an interim solution to the issue.
> 
> I extended the based on the advise "custom source/adapt an existing source" 
> and put in a RateLimiter ( guava ) that effectively put a cap on each kafka 
> consumer  ( x times the expected incident rqs ). That solved the issue  as in 
> it stabilized the flow into down stream window operation ( I use 
> ProcessFunction for sessionizing and why is another discussion ) . 
> 
>  This leads me to these conclusions and either of them could be correct
> 
> 
> * The kakfa partitions are on different brokers ( the leaders ) and based on 
> the how efficient the broker is ( whether it has data in OS Cache  or whether 
> there is a skew in leader distribution and thus more stress on a n of m 
> brokers) the consumption speed can vary.
> * The skew caused by number of consumers to number of flink nodes ( if no. of 
> partitions % no of flink nodes == 0 there is no skew ) the consumption rate 
> can vary.
> * Some TM nodes may be sluggish.
> 
> Either ways any of the above reasons can cause data to be consumed at 
> different speeds which could lead to an imbalance and b'coz of the paucity of 
> fine grained back pressure handling leads to more windows that remain open, 
> windows that reflect the more aggressive consumption ( and thus more variance 
> from the current WM)  than prudent, causing the pathological case described 
> above. By regulating the consumption rate ( I put the delimiter in the 
> extractTimestamp method ) , it effectively caused the more aggressive 
> consumptions to a fixed upper bound, making the rate of consumption across 
> consumers effectively similar.
> 
> Either ways it seems imperative that 
> https://issues.apache.org/jira/browse/FLINK-7282 
>  should be finalized at the 
> earliest. The consequences on a shared flink cluster are too huge IMHO.
> 
> Please tell me if my conclusions are problematic or do not make sense.
> 
> 
> Regards 
> 
> Vishal
> 
> 
> 
> 
> 
> 
> On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi  > wrote:
> Also note that if I were to start 2 pipelines 
> 
> 1. Working off the head of the topic and thus not prone to the pathological 
> case described above
> 2. Doing a replay and thus prone to the  pathological case described above
> 
> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
> All channels multiplexed into the same TCP connection stall together, as soon 
> as one channel has backpressure.
> 
> of the jira issue. This has to be a priority IMHO, in a shared VM where jobs 
> should have at least some isolation.
> 
> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi  > wrote:
> Thank you.
> 
> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber  > wrote:
> Hi Vishal,
> let me already point you towards the JIRA issue for the credit-based
> flow control: https://issues.apache.org/jira/browse/FLINK-7282 
> 
> 
> I'll have a look at the rest of this email thread tomorrow...
> 
> 
> Regards,
> Nico
> 
> On 02/01/18 17:52, Vishal Santoshi wrote:
> > Could you please point me to any documentation on the  "credit-based
> > flow control" approach
> >
> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther  > 
> > >> wrote:
> >
> > Hi Vishal,
> >
> > your assumptions sound reasonable to me. The community is currently
> > working on a more fine-grained back pressuring with credit-based
> > flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> > Nico that might tell you more about the details. Until then I guess
> > you have to implement a custom source/adapt an existing source to
> > let the data flow in more realistic.
> >
> > Regards,
> > Timo
> >
> > [1]
> > 
> > 

Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter
I think a mix of async UPDATES and exactly-once all this might be tricky, and 
the typical use case for async IO is more about reads. So let’s take a step 
back: what would you like to achieve with this? Do you want a 
read-modify-update (e.g. a map function that queries and updates a DB) or just 
updates (like a sink based that goes against a DB). From the previous question, 
I assume the second case applies, in which case I wonder why you even need to 
be async for a sink? I think a much better approach could be based on Flink's 
TwoPhaseCommitSinkFunction, and maybe use some some batching to lower update 
costs.

On top of the TwoPhaseCommitSinkFunction, you could implement transactions 
against your DB, similar to e.g. this example with Postgres: 
http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
 

 .

Does this help or do you really need async read-modify-update?

Best,
Stefan

> Am 03.01.2018 um 15:08 schrieb Jinhua Luo :
> 
> No, I mean how to implement exactly-once db commit (given our async io
> target is mysql), not the state used by flink.
> As mentioned in previous mail, if I commit db in
> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> and flink restart would not trigger notifyCheckpointComplete for the
> last checkpoint again).
> On the other hand, if I update and commit per record, the sql/stored
> procedure have to handle duplicate updates at failure restart.
> 
> So, when or where to commit so that we could get exactly-once db ingress.
> 
> 2018-01-03 21:57 GMT+08:00 Stefan Richter :
>> 
>> Hi,
>> 
>> 
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>> 
>> 
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>> 
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>> 
>> So it is already handled by Flink in a way that supports exactly-once.
>> 
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>> 
>> 
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>> 
>> 
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>> 
>> 
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter :
>> 
>> 
>> 
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>> 
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>> 
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>> 
>> 
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>> 
>> 
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>> 
>> 



Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Actually, In Kafka there are other topics also (around 5-6 topics) I am
consuming particular topic 'x' which only contains events.  Other topics
have different data.

I am using two consumers in my program for 2 different topics. in first
topic x i am extracting the timestamp from origintimestamp variable in
other one i am using system current millis.




‌

On Wed, Jan 3, 2018 at 8:06 PM, Aljoscha Krettek 
wrote:

> Ok, but will there be events in all Kafka partitions/topics?
>
>
> On 3. Jan 2018, at 15:33, shashank agarwal  wrote:
>
> Hi,
>
> Yes, Events will always carry a variable OriginTimestamp which I am using
> in the extractor. I have used fallback also in case of data missing will
> put System current millis.
>
> Still, it's not printing results.
>
> Best,
> Shashank
>
>
>
>
>
> ‌
>
> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Are all the partitions always carrying data that has advancing
>> timestamps? When using Event-time the Kafka source (and Flink in general)
>> needs to have steady progress in all partitions, otherwise the watermark
>> does not advance, which in turn means that processing will be stalled
>> downstream.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 3. Jan 2018, at 14:29, shashank agarwal  wrote:
>>
>> Hello,
>>
>> I have some patterns in my program. For an example,
>>
>>  A followedBy B.
>>
>> As I am using kafka source and my event API's using load balancers so
>> sometimes B comes before A. So my CEP doesn't generate any result for those
>> events.
>>
>> I have then tried event time and applied 
>> "BoundedOutOfOrdernessTimestampExtractor"
>> on kafkasource with extract time from an origin time variable which I have
>> in the event. I am using watermark lateness of 10 seconds in that.
>>
>> Now CEP stopped generating results. It's not even generating results
>> where Event B comes after A. I have tried within (10 seconds) in CEP also
>> still not generating results.
>>
>> Am I doing anything wrong?
>>
>> I have to cover the case where B can come after A from Kafka.
>>
>> --
>> Thanks Regards
>>
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
>>
>>
>>
>>
>> ‌
>>
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: RichAsyncFunction in scala

2018-01-03 Thread Aljoscha Krettek
Hi,

There is this Jira issue: https://issues.apache.org/jira/browse/FLINK-6756 


Best,
Aljoscha

> On 28. Dec 2017, at 17:10, Antoine Philippot  
> wrote:
> 
> Hi Ufuk,
> 
> I don't think it is possible as I use this function as a parameter of 
> AsyncDataStream (from the scala API) which is mandatory to use with the scala 
> DataStream.
> 
> 
> 
> Le jeu. 28 déc. 2017 à 16:55, Ufuk Celebi  > a écrit :
> Hey Antoine,
> 
> isn't it possible to use the Java RichAsyncFunction from Scala like this:
> 
> class Test extends RichAsyncFunction[Int, Int] {
> 
>   override def open(parameters: Configuration): Unit = super.open(parameters)
> 
>   override def asyncInvoke(input: Int, resultFuture:
> functions.async.ResultFuture[Int]): Unit = ???
> }
> 
> – Ufuk
> 
> 
> 
> On Thu, Dec 28, 2017 at 4:37 PM, Antoine Philippot
> > wrote:
> > Hi,
> >
> > It lacks a version of RichAsyncFunction class in the scala API or the
> > possibility to handle a class which extends AbstractRichFunction and
> > implements AsyncFunction (from the scala API).
> >
> > I made a small dev on our current flink fork because we need to use the open
> > method to add our custom metrics from getRuntimeContext.getMetricGroup
> > method.
> > https://github.com/aphilippot/flink/commit/acbd49cf2d64163040f2f954ce55917155b408ba
> >  
> > 
> >
> > Do you already plan to release this feature soon ? Do you want me to create
> > a new Jira ticket, propose a pull request ?
> >
> > Antoine



Re: BackPressure handling

2018-01-03 Thread Vishal Santoshi
To add and an interim solution to the issue.

I extended the based on the advise "custom source/adapt an existing source"
and put in a RateLimiter ( guava ) that effectively put a cap on each kafka
consumer  ( x times the expected incident rqs ). That solved the issue  as
in it stabilized the flow into down stream window operation ( I use
ProcessFunction for sessionizing and why is another discussion ) .

 This leads me to these conclusions and either of them could be correct


* The kakfa partitions are on different brokers ( the leaders ) and based
on the how efficient the broker is ( whether it has data in OS Cache  or
whether there is a skew in leader distribution and thus more stress on a n
of m brokers) the consumption speed can vary.
* The skew caused by number of consumers to number of flink nodes ( if no.
of partitions % no of flink nodes == 0 there is no skew ) the consumption
rate can vary.
* Some TM nodes may be sluggish.

Either ways any of the above reasons can cause data to be consumed at
different speeds which could lead to an imbalance and b'coz of the paucity
of fine grained back pressure handling leads to more windows that remain
open, windows that reflect the more aggressive consumption ( and thus more
variance from the current WM)  than prudent, causing the pathological case
described above. By regulating the consumption rate ( I put the delimiter
in the extractTimestamp method ) , it effectively caused the more
aggressive consumptions to a fixed upper bound, making the rate of
consumption across consumers effectively similar.

Either ways it seems imperative that https://issues.apache.org/
jira/browse/FLINK-7282 should be finalized at the earliest. The
consequences on a shared flink cluster are too huge IMHO.

Please tell me if my conclusions are problematic or do not make sense.


Regards

Vishal






On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi 
wrote:

> Also note that if I were to start 2 pipelines
>
> 1. Working off the head of the topic and thus not prone to the
> pathological case described above
> 2. Doing a replay and thus prone to the  pathological case described above
>
> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
>
>- All channels multiplexed into the same TCP connection stall
>together, as soon as one channel has backpressure.
>
>
> of the jira issue. This has to be a priority IMHO, in a shared VM where
> jobs should have at least some isolation.
>
> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi  > wrote:
>
>> Thank you.
>>
>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber 
>> wrote:
>>
>>> Hi Vishal,
>>> let me already point you towards the JIRA issue for the credit-based
>>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>>
>>> I'll have a look at the rest of this email thread tomorrow...
>>>
>>>
>>> Regards,
>>> Nico
>>>
>>> On 02/01/18 17:52, Vishal Santoshi wrote:
>>> > Could you please point me to any documentation on the  "credit-based
>>> > flow control" approach
>>> >
>>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther >> > > wrote:
>>> >
>>> > Hi Vishal,
>>> >
>>> > your assumptions sound reasonable to me. The community is currently
>>> > working on a more fine-grained back pressuring with credit-based
>>> > flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>>> > Nico that might tell you more about the details. Until then I guess
>>> > you have to implement a custom source/adapt an existing source to
>>> > let the data flow in more realistic.
>>> >
>>> > Regards,
>>> > Timo
>>> >
>>> > [1]
>>> > http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>>> -timeline.html
>>> > >> 5-timeline.html>
>>> > [2] https://www.youtube.com/watch?v=scStdhz9FHc
>>> > 
>>> >
>>> >
>>> > Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>>> >
>>> > I did a simulation on session windows ( in 2 modes ) and let it
>>> > rip for about 12 hours
>>> >
>>> > 1. Replay where a kafka topic with retention of 7 days was the
>>> > source ( earliest )
>>> > 2. Start the pipe with kafka source ( latest )
>>> >
>>> > I saw results that differed dramatically.
>>> >
>>> > On replay the pipeline stalled after  good ramp up while in the
>>> > second case the pipeline hummed on without issues. For the same
>>> > time period the data consumed is significantly more in the
>>> > second case with the WM progression stalled in the first case
>>> > with no hint of resolution ( the incoming data on source topic
>>> > far outstrips the WM progression )  I think I know the reasons
>>> > and this is my hypothesis.
>>> >
>>> > In 

Re: MergingWindow

2018-01-03 Thread Aljoscha Krettek
Yes, this is a very good description!

To see this in action you can run MergingWindowSetTest and comment out the 
check in MergingWindowSet, then you will see test failures and can trace what 
situations lead to problematic behaviour without that check.

> On 29. Dec 2017, at 15:07, jincheng sun  wrote:
> 
> Hi  aitozi,
> 
> `MergingWindowSet` is a Utility, used for keeping track of merging Windows 
> when using a MergingWindowAssigner in a WindowOperator. 
> 
> In flink  `MergingWindowAssigner`  only used for SessionWindow. The 
> implementations of  `MergingWindowAssigner` are `EventTimeSessionWindows` and 
> `ProcessingTimeSessionWindows`. As we know Session window depends on the 
> element of the time gap to split the window, if you encounter out-of-order 
> elements, there is a merge window situation.  
> 
> 
> In the `processElement` method of WindowOperator, when adding the new window 
> might result in a merge. the merge logic is in the `addWindow` method of 
> `MergingWindowSet`. The snippet you mentioned is in that method. To 
> understand the code snippet above, we must understand the collation logic of 
> the merge window。 Let me cite a merge example to illustrate the merging 
> logic, if we have two windows WinA, WinB, when WinC is added, we find WinA, 
> WinB, WinC should be merged. So, in this time WinC is new Window WinA and 
> WinB are in `MergingWindowSet.mapping`, the `mapping` is Map, Mapping 
> from window to the window that keeps the window state. When we are 
> incrementally merging windows starting from some window we keep that starting 
> window as the state window to prevent costly state juggling. As shown below:
> 
> 
> ​
> Now we  know the logic of merge window, and we talk about the logic of you 
> mentioned above:
> if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> mergeFunction.merge(mergeResult,
> mergedWindows,
> this.mapping.get(mergeResult),
> mergedStateWindows);
> }
> 
> This code is guarantee that don't merge the new window itself, it never had 
> any state associated with it i.e. if we are only merging one pre-existing 
> window into itself without extending the pre-exising window.
> 
> I am not sure if the explanation is clear, but I hope to be helpful to you. 
> :) 
> And welcome anybody feedback... :)
> 
> Best, Jincheng
> 
> 2017-12-27 16:58 GMT+08:00 Ufuk Celebi  >:
> Please check your email before sending it the next time as three
> emails for the same message is a little spammy ;-)
> 
> This is internal code that is used to implement session windows as far
> as I can tell. The idea is to not merge the new window as it never had
> any state associated with it. The general idea of merging windows is
> to keep one of the original windows as the state window, i.e. the
> window that is used as namespace to store the window elements.
> Elements from the state windows of merged windows must be merged into
> this one state window.
> 
> For more details, this should be directed to the dev mailing list.
> 
> – Ufuk
> 
> On Tue, Dec 26, 2017 at 4:58 AM, aitozi  > wrote:
> > Hi,
> >
> > i cant unserstand usage of this snippest of the code in
> > MergingWindowSet.java, can anyone explain this for me ?
> >
> >
> > if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
> > mergeFunction.merge(mergeResult,
> > mergedWindows,
> > 
> > this.mapping.get(mergeResult),
> > mergedStateWindows);
> > }
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> > 
> 



Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Ok, but will there be events in all Kafka partitions/topics?

> On 3. Jan 2018, at 15:33, shashank agarwal  wrote:
> 
> Hi,
> 
> Yes, Events will always carry a variable OriginTimestamp which I am using in 
> the extractor. I have used fallback also in case of data missing will put 
> System current millis. 
> 
> Still, it's not printing results.
> 
> Best,
> Shashank
> 
> 
> 
> 
> 
> ‌
> 
> On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek  > wrote:
> Hi,
> 
> Are all the partitions always carrying data that has advancing timestamps? 
> When using Event-time the Kafka source (and Flink in general) needs to have 
> steady progress in all partitions, otherwise the watermark does not advance, 
> which in turn means that processing will be stalled downstream.
> 
> Best,
> Aljoscha
> 
> 
>> On 3. Jan 2018, at 14:29, shashank agarwal > > wrote:
>> 
>> Hello,
>> 
>> I have some patterns in my program. For an example,
>> 
>>  A followedBy B.
>> 
>> As I am using kafka source and my event API's using load balancers so 
>> sometimes B comes before A. So my CEP doesn't generate any result for those 
>> events. 
>> 
>> I have then tried event time and applied 
>> "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time 
>> from an origin time variable which I have in the event. I am using watermark 
>> lateness of 10 seconds in that.
>> 
>> Now CEP stopped generating results. It's not even generating results where 
>> Event B comes after A. I have tried within (10 seconds) in CEP also still 
>> not generating results. 
>> 
>> Am I doing anything wrong?
>> 
>> I have to cover the case where B can come after A from Kafka.
>> 
>> -- 
>> Thanks Regards
>> 
>> SHASHANK AGARWAL
>>  ---  Trying to mobilize the things
>> 
>> 
>> 
>> 
>> ‌
> 
> 
> 
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things



Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Hi,

Yes, Events will always carry a variable OriginTimestamp which I am using
in the extractor. I have used fallback also in case of data missing will
put System current millis.

Still, it's not printing results.

Best,
Shashank





‌

On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek 
wrote:

> Hi,
>
> Are all the partitions always carrying data that has advancing timestamps?
> When using Event-time the Kafka source (and Flink in general) needs to have
> steady progress in all partitions, otherwise the watermark does not
> advance, which in turn means that processing will be stalled downstream.
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 14:29, shashank agarwal  wrote:
>
> Hello,
>
> I have some patterns in my program. For an example,
>
>  A followedBy B.
>
> As I am using kafka source and my event API's using load balancers so
> sometimes B comes before A. So my CEP doesn't generate any result for those
> events.
>
> I have then tried event time and applied 
> "BoundedOutOfOrdernessTimestampExtractor"
> on kafkasource with extract time from an origin time variable which I have
> in the event. I am using watermark lateness of 10 seconds in that.
>
> Now CEP stopped generating results. It's not even generating results where
> Event B comes after A. I have tried within (10 seconds) in CEP also still
> not generating results.
>
> Am I doing anything wrong?
>
> I have to cover the case where B can come after A from Kafka.
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
>
>
>
>
> ‌
>
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things


Re: Separate checkpoint directories

2018-01-03 Thread Kyle Hamlin


> On Jan 3, 2018, at 5:51 AM, Stefan Richter  
> wrote:
> 
> Hi,
> 
> first, let my ask why you want to have a different checkpoint directory per 
> topic? It is perfectly ok to have just a single checkpoint directory, so I 
> wonder what the intention is? Flink will already create proper subdirectories 
> and filenames and can identify the right checkpoint data for each operator 
> instance.
> 
> Best,
> Stefan
> 
>> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin :
>> 
>> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a 
>> neat feature. I would like to use this feature, but I'm wondering how that 
>> impacts the FsStateBackend checkpointing mechanism. Before I would subscribe 
>> to one topic and set a checkpoint path specific to that topic for example if 
>> the Kafka topic name was foo:
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
>> 
>> How does one dynamically set these checkpoint paths? Is it even necessary to 
>> do so, should I have one checkpoint path for all the possible topics the 
>> regex pattern could pick up?
> 


Re: keyby() issue

2018-01-03 Thread Aljoscha Krettek
Side note: Sliding windows can be quite expensive if the slide is small 
compared to the size. Flink will treat each "slide" as a separate window, so in 
your case you will get 60 * num_keys windows, which can become quite big.

Best,
Aljoscha

> On 2. Jan 2018, at 17:41, Timo Walther  wrote:
> 
> Hi Jinhua,
> 
> did you check the key group assignments? What is the distribution of 
> "MathUtils.murmurHash(keyHash) % maxParallelism" on a sample of your data? 
> This also depends on the hashCode on the output of your KeySelector.
> 
> keyBy should handle high traffic well, but it is designed for key spaces with 
> thousands or millions of values. If this is not the case, you need to 
> introduce some more artifical key to spread the load more evenly.
> 
> Regarding your OutOfMemoryError: I think you producing elements much faster 
> than the following operators after keyBy process/discard the elements. Can 
> you explain us your job in more detail? Are you using event-time? How do you 
> aggregate elements of the windows?
> 
> Regards,
> Timo
> 
> 
> 
> Am 1/1/18 um 6:00 AM schrieb Jinhua Luo:
>> I checked the logs, but no information indicates what happens.
>> 
>> In fact, in the same app, there is another stream, but its kafka
>> source is low traffic, and I aggregate some field of that source too,
>> and flink gives correct results continuously.
>> So I doubt if keyby() could not handle high traffic well (which
>> affects the number of keys in the key partitions).
>> 
>> 2018-01-01 2:04 GMT+08:00 Steven Wu :
  but soon later, no results produced, and flink seems busy doing something
 forever.
>>> Jinhua, don't know if you have checked these things. if not, maybe worth a
>>> look.
>>> 
>>> have you tried to do a thread dump?
>>> How is the GC pause?
>>> do you see flink restart? check the exception tab in Flink web UI for your
>>> job.
>>> 
>>> 
>>> 
>>> On Sun, Dec 31, 2017 at 6:20 AM, Jinhua Luo  wrote:
 I take time to read some source codes about the keyed stream
 windowing, and I make below understanding:
 
 a) the keyed stream would be split and dispatched to downstream tasks
 in hash manner, and the hash base is the parallelism of the downstream
 operator:
 
 See
 org.apache.flink.runtime.state.KeyGroupRangeAssignment.computeKeyGroupForKeyHash(int,
 int):
 MathUtils.murmurHash(keyHash) % maxParallelism;
 
 That's what the doc said "hash partitioning".
 
 So the compiled execution graph already determines whose operator
 instance receive which key groups.
 
 b) with windowing, the key is used to index window states, so the
 window function would receive the deserialized value from its
 corresponding window state of some key.
 
 b.1) The element would be added into the state first:
 
 See
 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(StreamRecord):
 windowState.add(element.getValue());
 
 b.2) when the trigger fires the window, the value would be
 deserialized from the keyed state:
 
 ACC contents = windowState.get();
 emitWindowContents(actualWindow, contents);
 
 For rocksdb backend, each input element would be taken back and forth
 from the disk in the processing.
 
 flink's keyed stream has the same functionality as storm's field
 grouping, and more complicated.
 
 Am I correct?
 
 
 But I still could not understand why keyby() stops flink from
 returning expected results.
 
 Let me explain my case more:
 I use kafka data source, which collects log lines of log files from
 tens of machines.
 The log line is in json format, which contains the "ip" field, the ip
 address of the user, so it could be valued in million of ip addresses
 of the Internet.
 The stream processing is expected to result in ip aggregation in {1
 hour, 1 min} sliding window.
 
 If I use keyBy("ip"), then at first minutes, the flink could give me
 correct aggregation results, but soon later, no results produced, and
 flink seems busy doing something forever.
 
 I doubt if keyby() could handle huge keys like this case, and when I
 remove keyby().window().fold() and use windowAll().fold() instead (the
 latter fold operator uses hashmap to aggregate ip by itself), flink
 works. But as known, the windowAll() is not scale-able.
 
 Could flink developers help me on this topic, I prefer flink and I
 believe flink is one of best stream processing frameworks, but I am
 really frustrated that flink could be fulfill its feature just like
 the doc said.
 
 Thank you all.
 
 
 2017-12-29 17:42 GMT+08:00 Jinhua Luo :
> I misuse the key selector. I checked the doc and found it must return
> deterministic key, so using 

Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Hi,

Are all the partitions always carrying data that has advancing timestamps? When 
using Event-time the Kafka source (and Flink in general) needs to have steady 
progress in all partitions, otherwise the watermark does not advance, which in 
turn means that processing will be stalled downstream.

Best,
Aljoscha

> On 3. Jan 2018, at 14:29, shashank agarwal  wrote:
> 
> Hello,
> 
> I have some patterns in my program. For an example,
> 
>  A followedBy B.
> 
> As I am using kafka source and my event API's using load balancers so 
> sometimes B comes before A. So my CEP doesn't generate any result for those 
> events. 
> 
> I have then tried event time and applied 
> "BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time 
> from an origin time variable which I have in the event. I am using watermark 
> lateness of 10 seconds in that.
> 
> Now CEP stopped generating results. It's not even generating results where 
> Event B comes after A. I have tried within (10 seconds) in CEP also still not 
> generating results. 
> 
> Am I doing anything wrong?
> 
> I have to cover the case where B can come after A from Kafka.
> 
> -- 
> Thanks Regards
> 
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things
> 
> 
> 
> 
> ‌



Re: does the flink sink only support bio?

2018-01-03 Thread Jinhua Luo
No, I mean how to implement exactly-once db commit (given our async io
target is mysql), not the state used by flink.
As mentioned in previous mail, if I commit db in
notifyCheckpointComplete, we have a risk to lost data (lost commit,
and flink restart would not trigger notifyCheckpointComplete for the
last checkpoint again).
On the other hand, if I update and commit per record, the sql/stored
procedure have to handle duplicate updates at failure restart.

So, when or where to commit so that we could get exactly-once db ingress.

2018-01-03 21:57 GMT+08:00 Stefan Richter :
>
> Hi,
>
>
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.
>
>
> From the docs about async IO here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
> :
>
> "Fault Tolerance Guarantees:
> The asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.“
>
> So it is already handled by Flink in a way that supports exactly-once.
>
> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?
>
>
> The information required depends a bit on the store that you are using,
> maybe the last confirmed checkpoint id is enough, but maybe you require
> something more. This transaction information is probably not „by-key“, but
> „per-operator“, so I would suggest to use operator state (see next answer).
> Btw the implementation of async operators does something very similar to
> restore pending requests, and you can see the code in „AsyncWaitOperator".
>
>
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?
>
>
> You can use non-keyed state, aka operator state, to store such information.
> See here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
> . It does not require a KeyedSteam.
>
> Best,
> Stefan
>
>
>
> 2018-01-03 18:43 GMT+08:00 Stefan Richter :
>
>
>
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>
> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>
> You need to be a bit careful if your sink needs exactly-once semantics. In
> this case things should either be idempotent or the db must support rolling
> back changes between checkpoints, e.g. via transactions. Commits should be
> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>
>
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?
>
>
> Yes, that is correct. What I was talking about was more the opposite
> problem,i.e. committing too early. In that case, you could have committed
> for a checkpoint that failed afterwards, and recovery will start from an
> earlier checkpoint but with your commit already applied. You should only
> commit after you received the notification or else your semantics can be
> down to „at-least-once".
>
>


Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter

Hi,

> 
> Then how to implement exactly-once async io? That is, neither missing
> data or duplicating data.

From the docs about async IO here 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
 

 :

"Fault Tolerance Guarantees:
The asynchronous I/O operator offers full exactly-once fault tolerance 
guarantees. It stores the records for in-flight asynchronous requests in 
checkpoints and restores/re-triggers the requests when recovering from a 
failure.“

So it is already handled by Flink in a way that supports exactly-once.

> Is there some way to index data by checkpoint id and records which
> checkpoints already commit to db? But that means we need MapState,
> right?

The information required depends a bit on the store that you are using, maybe 
the last confirmed checkpoint id is enough, but maybe you require something 
more. This transaction information is probably not „by-key“, but 
„per-operator“, so I would suggest to use operator state (see next answer). Btw 
the implementation of async operators does something very similar to restore 
pending requests, and you can see the code in „AsyncWaitOperator".

> 
> However, the async-io operator normally follows other operators, e.g.
> fold, so it normally faces the DataStream but not KeyedStream, and
> DataStream only supports ListState, right?

You can use non-keyed state, aka operator state, to store such information. See 
here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
 

 . It does not require a KeyedSteam.

Best,
Stefan

> 
> 
> 2018-01-03 18:43 GMT+08:00 Stefan Richter :
>> 
>> 
>>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>>> 
>>> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
 You need to be a bit careful if your sink needs exactly-once semantics. In 
 this case things should either be idempotent or the db must support 
 rolling back changes between checkpoints, e.g. via transactions. Commits 
 should be triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>> 
>>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>>> checkpoint was completed, and if the process crashes (or machine
>>> failure) before it commits the db, the flink would restart the app,
>>> restoring the state from the last checkpoint, but it would not invoke
>>> notifyCheckpointComplete again? correct? if so, we would miss the
>>> database ingress for the data between the last two checkpoints, am I
>>> correct?
>> 
>> Yes, that is correct. What I was talking about was more the opposite 
>> problem,i.e. committing too early. In that case, you could have committed 
>> for a checkpoint that failed afterwards, and recovery will start from an 
>> earlier checkpoint but with your commit already applied. You should only 
>> commit after you received the notification or else your semantics can be 
>> down to „at-least-once".
>> 



Re: Job Manager not able to fetch job info when restarted

2018-01-03 Thread Stefan Richter
Hi,

did you configure high availability with Zookeeper, as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html
 

 ? This should bring the desired behaviour.

Best,
Stefan

> Am 03.01.2018 um 14:04 schrieb Sushil Ks :
> 
> Hi,
>   We are launching Flink on Yarn job, here whenever the job manager gets 
> restarted, the previous task that was running doesn't get restarted instead 
> it shows no jobs running. Is there a way to resume previous running task, 
> when job manager restarts.
> 
> Regards,
> Sushil Ks 



Flink CEP with event time

2018-01-03 Thread shashank agarwal
Hello,

I have some patterns in my program. For an example,

 A followedBy B.

As I am using kafka source and my event API's using load balancers so
sometimes B comes before A. So my CEP doesn't generate any result for those
events.

I have then tried event time and applied
"BoundedOutOfOrdernessTimestampExtractor" on kafkasource with extract time
from an origin time variable which I have in the event. I am using
watermark lateness of 10 seconds in that.

Now CEP stopped generating results. It's not even generating results where
Event B comes after A. I have tried within (10 seconds) in CEP also still
not generating results.

Am I doing anything wrong?

I have to cover the case where B can come after A from Kafka.

-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things




‌


Re: Lower Parallelism derives better latency

2018-01-03 Thread Stefan Richter
Hi,

one possible explanation that I see is the following: in a shuffle, each there 
are input and output buffers for each parallel subtask to which data could be 
shuffled. Those buffers are flushed either when full or after a timeout 
interval. If you increase the parallelism, there are more buffers and each 
buffer gets a smaller fraction of the data. This, in turn, means that it takes 
longer until an individual buffer is full and data is emitted. The timeout 
interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this 
would increase latency without bounds - at least once you hit the buffer 
timeout interval the latency should no longer increase. You could validate this 
by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan

> Am 03.01.2018 um 08:13 schrieb Netzer, Liron :
> 
> Hi group,
>  
> We have a standalone Flink cluster that is running on a UNIX host with 40 
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each operator 
> has the same parallelism),
> we see a consistent change in the latency, it gets better:
> Test run
> Parallelism
> 99 percentile
> 95 percentile
> 75 percentile
> Mean
> #1
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> #2
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> #3
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> #4
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> #5
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>  
> This was a surprise for us, as we expected that higher parallelism will 
> derive better latency.
> Could you try to assist us to understand this behavior? 
> I know that when there are more threads that are involved, there is probably 
> more serialization/deserialization, but this can't be the only reason for 
> this behavior.
>  
> We have two Kafka sources, and the rest of the operators are fixed windows, 
> flatmaps, coMappers and several KeyBys. 
> Except for the Kafka sources and some internal logging, there is no other I/O 
> (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>  
>  
> Thanks,
> Liron



Re: does the flink sink only support bio?

2018-01-03 Thread Jinhua Luo
Then how to implement exactly-once async io? That is, neither missing
data or duplicating data.

Is there some way to index data by checkpoint id and records which
checkpoints already commit to db? But that means we need MapState,
right?

However, the async-io operator normally follows other operators, e.g.
fold, so it normally faces the DataStream but not KeyedStream, and
DataStream only supports ListState, right?


2018-01-03 18:43 GMT+08:00 Stefan Richter :
>
>
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
>>
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>>> You need to be a bit careful if your sink needs exactly-once semantics. In 
>>> this case things should either be idempotent or the db must support rolling 
>>> back changes between checkpoints, e.g. via transactions. Commits should be 
>>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>>
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>
> Yes, that is correct. What I was talking about was more the opposite 
> problem,i.e. committing too early. In that case, you could have committed for 
> a checkpoint that failed afterwards, and recovery will start from an earlier 
> checkpoint but with your commit already applied. You should only commit after 
> you received the notification or else your semantics can be down to 
> „at-least-once".
>


Job Manager not able to fetch job info when restarted

2018-01-03 Thread Sushil Ks
Hi,
  We are launching Flink on Yarn job, here whenever the job manager
gets restarted, the previous task that was running doesn't get restarted
instead it shows no jobs running. Is there a way to resume previous running
task, when job manager restarts.

Regards,
Sushil Ks


Re: Lower Parallelism derives better latency

2018-01-03 Thread Aljoscha Krettek
Hi,

How are you measuring latency? Is it latency within a Flink Job or from Kafka 
to Kafka? The first seems more likely but I'm still interested in the details.

Best,
Aljoscha

> On 3. Jan 2018, at 08:13, Netzer, Liron  wrote:
> 
> Hi group,
>  
> We have a standalone Flink cluster that is running on a UNIX host with 40 
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each operator 
> has the same parallelism),
> we see a consistent change in the latency, it gets better:
> Test run
> Parallelism
> 99 percentile
> 95 percentile
> 75 percentile
> Mean
> #1
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> #2
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> #3
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> #4
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> #5
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>  
> This was a surprise for us, as we expected that higher parallelism will 
> derive better latency.
> Could you try to assist us to understand this behavior? 
> I know that when there are more threads that are involved, there is probably 
> more serialization/deserialization, but this can't be the only reason for 
> this behavior.
>  
> We have two Kafka sources, and the rest of the operators are fixed windows, 
> flatmaps, coMappers and several KeyBys. 
> Except for the Kafka sources and some internal logging, there is no other I/O 
> (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>  
>  
> Thanks,
> Liron



Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Stephan Ewen
The error is not a the missing Class "S3ErrorResponseHandler", but the
initialization of that class.

It could be missing classes that are statically referenced by the
"S3ErrorResponseHandler".
The ones I see are "XMLInputFactory" (part of Java itself, should always be
there) and "org.apache.commons.logging.Log" which should also be there.
There may be other classes loaded during initialization. Knowing more about
the exception would help, as we have not seen that error, yet.


On Wed, Jan 3, 2018 at 11:39 AM, Nico Kruber  wrote:

> Hi Kyle,
> except for putting the jar into the lib/ folder and setting up
> credentials, nothing else should be required [1].
>
> The S3ErrorResponseHandler class itself is in the jar, as you can see with
> jar tf flink-s3-fs-presto-1.4.0.jar | grep
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.
> S3ErrorResponseHandler
>
> Therefore, the cause for this exception would be interesting (as Stephan
> suggested).
>
>
> Nico
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/deployment/aws.html#shaded-hadooppresto-
> s3-file-systems-recommended
>
> On 03/01/18 10:22, Stephan Ewen wrote:
> > Hi Kyle!
> >
> > Is there more of the stack trace available, like an original exception
> > cause?
> >
> > Best,
> > Stephan
> >
> >
> > On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin  > > wrote:
> >
> > Hi,
> >
> > When testing Flink 1.4 locally the error below keeps getting thrown.
> > I've followed the setup by moving the flink-s3-fs-presto.jar from
> > the opt/ folder to the lib/ folder. Is there something additional I
> > need to do?
> >
> > java.lang.NoClassDefFoundError: Could not initialize class
> > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.internal.S3ErrorResponseHandler
> > at
> > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:363)
> > at
> > org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:542)
> > at
> > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> > at
> > org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> > at
> > org.apache.flink.fs.s3presto.S3FileSystemFactory.create(
> S3FileSystemFactory.java:132)
> > at
> > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:397)
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> > at
> > org.apache.flink.runtime.state.filesystem.
> FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
> > at
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:277)
> > at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> createCheckpointStreamFactory(StreamTask.java:787)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> > at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> > at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> > at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >
> >
>
>


Re: Apache Flink - Connected Stream with different number of partitions

2018-01-03 Thread Aljoscha Krettek
Hi,

The answer is correct but I'll try and elaborate a bit: the way data is sent to 
downstream operations depends on a couple of things in this case:

 - parallelism of first input operation
 - parallelism of second input operation
 - parallelism of co-operation
 - transmission pattern on first input (broadcast, rebalance, etc.)
 - transmission pattern on second input

Note that there is no parallelism on "streams" since there are technically no 
streams but only operations that are interconnected in a certain way.

Now, if the input parallelism and the operation parallelism are the same and 
you don't specify a transmission pattern then data will not be "shuffled" 
between the operations. If you specify broadcast or rebalance then you will get 
that, i.e. for broadcast an element from the input operator will be sent to 
every instance on the downstream operation.

Best,
Aljoscha

> On 3. Jan 2018, at 10:43, Timo Walther  wrote:
> 
> Hi Mans,
> 
> I did a quick test on my PC where I simply set breakpoints in map1 and map2 
> (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements 
> of someStream end up in different CoMapTasks (2/8, 7/8 etc.).
> 
> So I guess the distribution is a round robin partioning. @Aljoscha might know 
> more about the internals?
> 
> Regards,
> Timo
> 
> 
> 
> Am 12/31/17 um 10:38 PM schrieb M Singh:
>> Hi:
>> 
>> Referring to documentation 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html
>>  
>> )
>>  for ConnectedStreams:
>> 
>> "Connects" two data streams retaining their types. Connect allowing for 
>> shared state between the two streams.
>> DataStream someStream = //...
>> DataStream otherStream = //...
>> 
>> ConnectedStreams connectedStreams = 
>> someStream.connect(otherStream);
>> 
>> If the two connected streams have different number of partitions, eg 
>> (someStream has 4 and otherStream has 2), then how do the elements of the 
>> stream get distributed for the CoMapFunction:
>> 
>> connectedStreams.map(new CoMapFunction() {
>> @Override
>> public Boolean map1(Integer value) {
>> return true;
>> }
>> 
>> @Override
>> public Boolean map2(String value) {
>> return false;
>> }
>> });
>> 
>> I believe that that if the second stream is broadcast, then each partition 
>> of the first will get all the elements of the second.  Is my understanding 
>> correct ?
>> 
>> If the streams are not broadcast and since the first stream has 4 partitions 
>> and second one had 2, then how are the elements of the second stream 
>> distributed to each partition of the first ?
>> 
>> Also, if the streams are not broadcasted but have same number of partitions, 
>> how are the elements distributed ?
>> 
>> Thanks
>> 
>> Mans
>> 
>> 
>> 
>> 
> 



Re: Separate checkpoint directories

2018-01-03 Thread Stefan Richter
Hi,

first, let my ask why you want to have a different checkpoint directory per 
topic? It is perfectly ok to have just a single checkpoint directory, so I 
wonder what the intention is? Flink will already create proper subdirectories 
and filenames and can identify the right checkpoint data for each operator 
instance.

Best,
Stefan

> Am 31.12.2017 um 17:10 schrieb Kyle Hamlin :
> 
> Flink 1.4 added regex pattern matching for FlinkKafkaConsumer's which is a 
> neat feature. I would like to use this feature, but I'm wondering how that 
> impacts the FsStateBackend checkpointing mechanism. Before I would subscribe 
> to one topic and set a checkpoint path specific to that topic for example if 
> the Kafka topic name was foo:
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(new FsStateBackend("s3://checkpoints/foo/"))
> 
> How does one dynamically set these checkpoint paths? Is it even necessary to 
> do so, should I have one checkpoint path for all the possible topics the 
> regex pattern could pick up?



Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-03 Thread Stefan Richter
Hi,

I would interpret this as: the reduce produces an output for every new reduce 
call, emitting the updated value. There is no need for a window because it 
kicks in on every single invocation.

Best,
Stefan


> Am 31.12.2017 um 22:28 schrieb M Singh :
> 
> Hi:
> 
> Apache Flink documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html)
>  indicates that a reduce function on a KeyedStream  as follows:
> 
> A "rolling" reduce on a keyed data stream. Combines the current element with 
> the last reduced value and emits the new value. 
> 
> A reduce function that creates a stream of partial sums:
> keyedStream.reduce(new ReduceFunction() {
> @Override
> public Integer reduce(Integer value1, Integer value2)
> throws Exception {
> return value1 + value2;
> }
> });
> 
> The KeyedStream is not windowed, so when does the reduce function kick in to 
> produce the DataStream (ie, is there a default time out, or collection size 
> that triggers it, since we have not defined any window on it).
> 
> Thanks
> 
> Mans



Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter


> Am 01.01.2018 um 15:22 schrieb Jinhua Luo :
> 
> 2017-12-08 18:25 GMT+08:00 Stefan Richter :
>> You need to be a bit careful if your sink needs exactly-once semantics. In 
>> this case things should either be idempotent or the db must support rolling 
>> back changes between checkpoints, e.g. via transactions. Commits should be 
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
> 
> I doubt if we have a risk here: in notifyCheckpointComplete, the
> checkpoint was completed, and if the process crashes (or machine
> failure) before it commits the db, the flink would restart the app,
> restoring the state from the last checkpoint, but it would not invoke
> notifyCheckpointComplete again? correct? if so, we would miss the
> database ingress for the data between the last two checkpoints, am I
> correct?

Yes, that is correct. What I was talking about was more the opposite 
problem,i.e. committing too early. In that case, you could have committed for a 
checkpoint that failed afterwards, and recovery will start from an earlier 
checkpoint but with your commit already applied. You should only commit after 
you received the notification or else your semantics can be down to 
„at-least-once".



Re: scala 2.12 support/cross-compile

2018-01-03 Thread Aljoscha Krettek
Hi,

This is the umbrella issue for Scala 2.12 support. As Stephan pointed out, the 
ClosureCleaner and SAMs are currently the main problems. The first is also a 
problem for Spark, which track their respective progress here: 
https://issues.apache.org/jira/browse/SPARK-14540 
.

Best,
Aljoscha

> On 3. Jan 2018, at 10:39, Stephan Ewen  wrote:
> 
> Hi Hao Sun!
> 
> This is work in progress, but Scala 2.12 is a bit tricky. I think the Scala 
> folks have messed this version up a bit, to be honest.
> 
> The main blockers is that Scala 2.12 breaks some classes through its addition 
> of SAM interface lambdas (similar to Java). Many of the DataStream API 
> classes have two method variants (one with a Scala Function, one with a Java 
> SAM interface) which now become ambiguously overloaded methods in Scala 2.12.
> 
> In addition, Scala 2.12 also needs a different closure cleaner, because Scala 
> 2.12 compiles differently.
> 
> I am adding Aljoscha, who has started working on this...
> 
> Best,
> Stephan
> 
> 
> On Wed, Jan 3, 2018 at 4:13 AM, Hao Sun  > wrote:
> Hi team, I am wondering if there is a schedule to support scala 2.12?
> If I need flink 1.3+ with scala 2.12, do I just have to cross compile myself? 
> Is there anything blocking us from using scala 2.12?
> 
> Thanks
> 



Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Nico Kruber
Hi Kyle,
except for putting the jar into the lib/ folder and setting up
credentials, nothing else should be required [1].

The S3ErrorResponseHandler class itself is in the jar, as you can see with
jar tf flink-s3-fs-presto-1.4.0.jar | grep
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler

Therefore, the cause for this exception would be interesting (as Stephan
suggested).


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended

On 03/01/18 10:22, Stephan Ewen wrote:
> Hi Kyle!
> 
> Is there more of the stack trace available, like an original exception
> cause?
> 
> Best,
> Stephan
> 
> 
> On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin  > wrote:
> 
> Hi,
> 
> When testing Flink 1.4 locally the error below keeps getting thrown.
> I've followed the setup by moving the flink-s3-fs-presto.jar from
> the opt/ folder to the lib/ folder. Is there something additional I
> need to do?
> 
> java.lang.NoClassDefFoundError: Could not initialize class
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:363)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:542)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at
> 
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> at
> 
> org.apache.flink.fs.s3presto.S3FileSystemFactory.create(S3FileSystemFactory.java:132)
> at
> 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at
> 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)
> at
> 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)
> at
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: JobManager not receiving resource offers from Mesos

2018-01-03 Thread Stefan Richter
Hi,

did you see this exception right at the head of your log?

java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
at org.apache.hadoop.util.Shell.(Shell.java:290)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.(Groups.java:77)
at 
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.util.EnvironmentInformation.getHadoopUser(EnvironmentInformation.java:96)
at 
org.apache.flink.runtime.util.EnvironmentInformation.logEnvironmentInfo(EnvironmentInformation.java:285)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.main(MesosApplicationMasterRunner.java:131)

I think you forgot to configure the HADOOP_HOME properly. Does that solve your 
problem?

Best,
Stefan


> Am 03.01.2018 um 07:12 schrieb 김동원 :
> 
> Oops, I forgot to include files in the previous mail.
> 
> 
> 
> 
> 
> 
> 
> 
>> 2018. 1. 3. 오후 3:10, 김동원 > > 작성:
>> 
>> Hi,
>> 
>> I try to launch a Flink cluster on top of dc/os but TaskManagers are not 
>> launched at all.
>> 
>> What I do to launch a Flink cluster is as follows:
>> - Click "flink" from "Catalog" on the left panel of dc/os GUI.
>> - Click "Run service" without any modification on configuration for the 
>> purpose of testing (Figure 1).
>> 
>> Until now, everything seems okay as shown in Figure 2.
>> However, Figure 3 shows that TaskManager has never been launched.
>> 
>> So I take a look at JobManager log (see the attached "log.txt" for full log).
>> LaunchCoordinator is spitting the same log messages while staying in 
>> "GetheringOffers" state as follows:
>> INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- 
>> Processing 1 task(s) against 0 new offer(s) plus outstanding off$
>> DEBUG com.netflix.fenzo.TaskScheduler   - Found 
>> 0 VMs with non-zero offers to assign from
>> INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- 
>> Resources considered: (note: expired offers not deducted from be$
>> DEBUG org.apache.flink.mesos.scheduler.LaunchCoordinator- 
>> SchedulingResult{resultMap={}, failures={}, leasesAdded=0, lease$
>> INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- 
>> Waiting for more offers; 1 task(s) are not yet launched.
>> (FYI, ConnectionMonitor is in its "ConnectedState" as you can see in the 
>> full log file.)
>> 
>> Can anyone point out what's going wrong on my dc/os installation?
>> Thanks you for attention. I'm really looking forward to running Flink 
>> clusters on dc/os :-)
>> 
>> p.s. I tested whether dc/os is working correctly by using the following 
>> scripts and it works.
>> {
>>  "id": "simple-gpu-test",
>>  "acceptedResourceRoles":["slave_public", "*"],
>>  "cmd": "while [ true ] ; do nvidia-smi; sleep 5; done",
>>  "cpus": 1,
>>  "mem": 128,
>>  "disk": 0,
>>  "gpus": 1,
>>  "instances": 8
>> }
>> 
>> 
> 



Re: scala 2.12 support/cross-compile

2018-01-03 Thread Stephan Ewen
Hi Hao Sun!

This is work in progress, but Scala 2.12 is a bit tricky. I think the Scala
folks have messed this version up a bit, to be honest.

The main blockers is that Scala 2.12 breaks some classes through its
addition of SAM interface lambdas (similar to Java). Many of the DataStream
API classes have two method variants (one with a Scala Function, one with a
Java SAM interface) which now become ambiguously overloaded methods in
Scala 2.12.

In addition, Scala 2.12 also needs a different closure cleaner, because
Scala 2.12 compiles differently.

I am adding Aljoscha, who has started working on this...

Best,
Stephan


On Wed, Jan 3, 2018 at 4:13 AM, Hao Sun  wrote:

> Hi team, I am wondering if there is a schedule to support scala 2.12?
> If I need flink 1.3+ with scala 2.12, do I just have to cross compile
> myself? Is there anything blocking us from using scala 2.12?
>
> Thanks
>


Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Stephan Ewen
Hi Kyle!

Is there more of the stack trace available, like an original exception
cause?

Best,
Stephan


On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin  wrote:

> Hi,
>
> When testing Flink 1.4 locally the error below keeps getting thrown. I've
> followed the setup by moving the flink-s3-fs-presto.jar from the opt/
> folder to the lib/ folder. Is there something additional I need to do?
>
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.internal.
> S3ErrorResponseHandler
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:363)
> at org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.
> s3.AmazonS3Client.(AmazonS3Client.java:542)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:639)
> at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.
> hive.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:212)
> at org.apache.flink.fs.s3presto.S3FileSystemFactory.create(
> S3FileSystemFactory.java:132)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:397)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<
> init>(FsCheckpointStreamFactory.java:99)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createStreamFactory(FsStateBackend.java:277)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createCheckpointStreamFactory(StreamTask.java:787)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>


Re: Two operators consuming from same stream

2018-01-03 Thread Timo Walther

Hi Tovi,

I think your code without duplication performs two separate shuffle 
operations whereas the other code only performs one shuffle.


Further latency impacts might be due to the overhead involved in 
maintaining the partitioning for a keyed stream/key groups and switching 
key contexts in the operator.


Did you check the latency of the following?

DataStream<> ds = 
orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);

ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :


Hi group,

We have the following graph below, on which we added metrics for 
latency calculation.


We have two streams which are consumed by two operators:

·ordersStream and pricesStream – they are both consumed by two 
operators: CoMapperA and CoMapperB, each using connect.


Initially we thought that for stream consumed by two operators – that 
we need to duplicate the stream to two separate streams, so we did it 
using split as below.


Then we understood it is not a must , and two operators can consume 
same stream, so we removed the duplicate part.


However – when checking latency – we found that latency with 
duplicated streams was much better than without duplication (about twice).


_My questions: _

·Is the improved latency related to check pointing separately on those 
streams ?


·What is the cons of using the duplication if it has better latency? 
Are we harming the state correctness in any way?


_Additional Info:_

The two graphs configuration appear exactly the same in execution 
plan\web UI:




sourceOrders.keyBy,CoMapperA,OrdersStreams


prsss










sourcePrices.keyBy,CoMapperB,pricesStreams


_Code without duplication looks something like: _

KeyedStream orderKeyedStream = ordersStream.keyBy(field);

KeyedStream pricesKeyedStream = pricesStream.keyBy(field);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

_Code used for duplication: _

(We duplicate streams and then do connect of pricesStreamA with 
ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part 
of connect, similar to above).__


//duplicate prices streams

    SplitStream pricesStream = pricesStream

    .split( price -> ImmutableList.of("pricesStreamA "," 
pricesStreamB ") );


    DataStream pricesStreamA = 
pricesStreams.select("pricesStreamA");


    DataStream< Price > pricesStreamB= 
pricesStreams.select("pricesStreamB");


Thanks,

Tovi