[Blink]sql client kafka sink 失败

2019-02-21 Thread 张洪涛


大家好!


我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤


环境配置
blink standalone 模式




1. 配置environment 启动sql client


2. 创建kafka sink table
CREATETABLEkafka_sink(
   messageKeyVARBINARY,
   messageValueVARBINARY,
   PRIMARYKEY(messageKey))
with(
   type='KAFKA011',
   topic='sink-topic',
   `bootstrap.servers`='172.19.0.108:9092',
   retries='3'
);


3. 创建查询语句并执行
INSERT INTO kafka_sink
SELECT CAST('123' AS VARBINARY) AS key,
CAST(CONCAT_WS(',', 'HELLO', 'WORLD') AS VARBINARY) AS msg;




错误日志(from task executor log)


主要是找不到kafka common package下面的一个类, 但是启动sql client 时候已经把kafka connector 
相关的jar包包括在内 在提交job时候 也会把这些jars 和 jobgraph一并上传到cluster,理论上这些class都会被加载






2019-02-22 14:37:18,356 ERROR 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread  - 
Uncaught exception in kafka-producer-network-thread | producer-1:
java.lang.NoClassDefFoundError: 
org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:339)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:293)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:391)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:485)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:254)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
at 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.Crc32C
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)




--
  Best Regards
  Hongtao



Re: SinkFunction.Context

2019-02-21 Thread Rong Rong
Hi Durga,

1. currentProcessingTime: refers to this operator(SinkFunction)'s system
time at the moment of invoke
1a. the time you are referring to as "flink window got the message" is the
currentProcessingTime() invoked at the window operator (which provided by
the WindowContext similar to this one [1])
2 currentWatermark: refers to the current watermark [2] received by this
operator(SinkFunction)
3. timestamp: is actually the input record's event-time (this "input" is
referring to the input to the SinkFunction, not to the entire Flink
topology)

Hope these help.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#event-time-and-watermarks

On Thu, Feb 21, 2019 at 4:59 PM Durga Durga  wrote:

>
> HI Folks,
>
> Was following the documentation for
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.Context.html
>
>
>
> long currentProcessingTime
> 
> ()
> Returns the current processing time.
> long currentWatermark
> 
> ()
> Returns the current event-time watermark.
> Long
> 
> timestamp
> 
> ()
> Returns the timestamp of the current input record or null if the element
> does not have an assigned timestamp.
>
> - CurrentProcessing time - is this is the Event Time ? i.e the time when
> the Event Occured ? (or) when the flink window got the message ?.
>
> - timeStamp - is this the time the record is persisted in to the sync ?.
> (or) the aggregated data timestamp ?. Say if I have 100 records in my  time
> window - which time comes in to affect.
>
> - CurrentWaterMark - which time is this - the time the even occured - what
> will this value be - when there are 1000 records in my time window
>
> PS - We want to use some ID ( timestamp ) and associate with  all the
> records that are persisted (aggregated) in a given time window - i.e if
> there are 1000 records aggregated - and they resulted in 10 aggregated
> records - we want to give these 10 aggregated records the same ID and we
> want to use one of the above time stamp
>
> Thanks much.
>


SinkFunction.Context

2019-02-21 Thread Durga Durga
HI Folks,

Was following the documentation for

https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.Context.html



long currentProcessingTime

()
Returns the current processing time.
long currentWatermark

()
Returns the current event-time watermark.
Long

timestamp

()
Returns the timestamp of the current input record or null if the element
does not have an assigned timestamp.

- CurrentProcessing time - is this is the Event Time ? i.e the time when
the Event Occured ? (or) when the flink window got the message ?.

- timeStamp - is this the time the record is persisted in to the sync ?.
(or) the aggregated data timestamp ?. Say if I have 100 records in my  time
window - which time comes in to affect.

- CurrentWaterMark - which time is this - the time the even occured - what
will this value be - when there are 1000 records in my time window

PS - We want to use some ID ( timestamp ) and associate with  all the
records that are persisted (aggregated) in a given time window - i.e if
there are 1000 records aggregated - and they resulted in 10 aggregated
records - we want to give these 10 aggregated records the same ID and we
want to use one of the above time stamp

Thanks much.


Re: Jira issue Flink-11127

2019-02-21 Thread Boris Lublinsky
Adding metric-query port makes it a bit better, but there is still an error


019-02-22 00:03:56,173 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
   - Could not resolve ResourceManager address 
akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, 
retrying in 1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/),
 Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of 
type "akka.actor.Identify"..
2019-02-22 00:04:16,213 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not 
resolve ResourceManager address 
akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, 
retrying in 1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/),
 Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of 
type "akka.actor.Identify"..
2019-02-22 00:04:36,253 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not 
resolve ResourceManager address 
akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, 
retrying in 1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/),
 Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of 
type "akka.actor.Identify"..
2019-02-22 00:04:56,293 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not 
resolve ResourceManager address 
akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager, 
retrying in 1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/),
 Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of 
type "akka.actor.Identify”..

In the task manager and

2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]
2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]
2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter
- dropping message [class akka.actor.ActorSelectionMessage] for 
non-local recipient 
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/]] arriving at 
[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123] inbound addresses are 
[akka.tcp://flink@127.0.0.1:6123]

I the job manager

Port 6123 is opened in both Job Manager deployment

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: {{ template "fullname" . }}-jobmanager
spec:
  replicas: 1
  template:
metadata:
  annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '9249'
  labels:
server: flink
app: {{ template "fullname" . }}
component: jobmanager
spec:
  containers:
  - name: jobmanager
image: {{ .Values.image }}:{{ .Values.imageTag }}
imagePullPolicy: {{ .Values.imagePullPolicy }}
args:
- jobmanager
ports:
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob
- containerPort: 8081
  name: ui
env:
- name: CONTAINER_METRIC_PORT
  value: '{{ 

Re: Jira issue Flink-11127

2019-02-21 Thread Boris Lublinsky

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 21, 2019, at 2:05 AM, Konstantin Knauf  
> wrote:
> 
> Hi Boris, 
> 
> the exact command depends on the docker-entrypoint.sh script and the image 
> you are using. For the example contained in the Flink repository it is 
> "task-manager", I think. The important thing is to pass "taskmanager.host" to 
> the Taskmanager process. You can verify by checking the Taskmanager logs. 
> These should contain lines like below:
> 
> 2019-02-21 08:03:00,004 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -  Program 
> Arguments:
> 2019-02-21 08:03:00,008 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - 
> -Dtaskmanager.host=10.12.10.173
> 
> In the Jobmanager logs you should see that the Taskmanager is registered 
> under the IP above in a line similar to:
> 
> 2019-02-21 08:03:26,874 INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda 
> (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0 
> ) at ResourceManager
> 
> A service per Taskmanager is not required. The purpose of the config 
> parameter is that the Jobmanager addresses the taskmanagers by IP instead of 
> hostname.
> 
> Hope this helps!
> 
> Cheers, 
> 
> Konstantin
> 
> 
> 
> On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Also, The suggested workaround does not quite work.
> 2019-02-20 15:27:43,928 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>] has failed, address is 
> now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink-metrics@flink-taskmanager-1:6170 <>]] Caused by: 
> [flink-taskmanager-1: No address associated with hostname]
> 2019-02-20 15:27:48,750 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - 
> Caught exception
> 
> I think the problem is that its trying to connect to flink-task-manager-1
> 
> Using busybody to experiment with nslookup, I can see
> / # nslookup flink-taskmanager-1.flink-taskmanager
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us 
> -west-2.compute.internal
> 
> Name:  flink-taskmanager-1.flink-taskmanager
> Address 1: 10.131.2.136 
> flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
> / # nslookup flink-taskmanager-1
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us 
> -west-2.compute.internal
> 
> nslookup: can't resolve 'flink-taskmanager-1'
> / # nslookup flink-taskmanager-0.flink-taskmanager
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us 
> -west-2.compute.internal
> 
> Name:  flink-taskmanager-0.flink-taskmanager
> Address 1: 10.131.0.111 
> flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
> / # nslookup flink-taskmanager-0
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us 
> -west-2.compute.internal
> 
> nslookup: can't resolve 'flink-taskmanager-0'
> / # 
> 
> So the name should be postfixed with the service name. How do I force it? I 
> suspect I am missing config parameter
> 
>  
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
>> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf > > wrote:
>> 
>> Hi Boris, 
>> 
>> the solution is actually simpler than it sounds from the ticket. The only 
>> thing you need to do is to set the "taskmanager.host" to the Pod's IP 
>> address in the Flink configuration. The easiest way to do this is to pass 
>> this config dynamically via a command-line parameter. 
>> 
>> The Deployment spec could looks something like this:
>> containers:
>> - name: taskmanager
>>   [...]
>>   args:
>>   - "taskmanager.sh"
>>   - "start-foreground"
>>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>>   [...]
>>   env:
>>   - name: K8S_POD_IP
>> valueFrom:
>>   fieldRef:
>> fieldPath: status.podIP
>> 
>> Hope this helps and let me know if this works. 
>> 
>> Best, 
>> 
>> Konstantin
>> 
>> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> I was looking at this issue 
>> https://issues.apache.org/jira/browse/FLINK-11127 
>> 
>> Apparently there is a workaround for it.
>> Is it possible provide the complete helm chart for it.
>> Bits and pieces are in the ticket, but it would be nice to see the full chart
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 

Re: Metrics for number of "open windows"?

2019-02-21 Thread Rong Rong
Hi Andrew,

I am assuming you are actually using customized windowAssigner, trigger and
process function.
I think the best way for you to keep in-flight, not-yet-triggered windows
is to emit metrics in these 3 pieces.

Upon looking at the window operator, I don't think there's a a metrics
(guage) that keeps how many windows are not yet fired.
This information is available in the KeyedStateBackend, but I dont think
the KeyedStateBackend is emitting any metrics related to what you want.

Thanks,
Rong

On Tue, Feb 19, 2019 at 12:14 PM Andrew Roberts  wrote:

> Hello,
>
> I’m trying to track the number of currently-in-state windows in a keyed,
> windowed stream (stream.keyBy(…).window(…).trigger(…).process(…)) using
> Flink metrics. Are there any built in? Or any good approaches for
> collecting this data?
>
> Thanks,
>
> Andrew
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


Re: Jira issue Flink-11127

2019-02-21 Thread Boris Lublinsky
Konstantin, it still does not quite work
The IP is still in place, but…

Here is Job manager log
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
Starting Job Manager
config file: 
jobmanager.rest.address: crabby-kudu-fdp-flink-jobmanager-service
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
rest.port: 8081
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
blob.server.port: 6124
query.server.port: 6125
Starting standalonesession as a console application on host 
crabby-kudu-fdp-flink-jobmanager-85c8d799db-46rj2.
2019-02-21 21:00:37,803 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 

2019-02-21 21:00:37,804 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting 
StandaloneSessionClusterEntrypoint (Version: 1.7.1, Rev:89eafb4, 
Date:14.12.2018 @ 15:48:34 GMT)
2019-02-21 21:00:37,804 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  OS current 
user: ?
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Current 
Hadoop/Kerberos user: 
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM: OpenJDK 
64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Maximum heap 
size: 981 MiBytes
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JAVA_HOME: 
/docker-java-home/jre
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  No Hadoop 
Dependency available
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM Options:
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xms1024m
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - -Xmx1024m
2019-02-21 21:00:37,805 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Program 
Arguments:
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - --configDir
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
/opt/flink/conf
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
--executionMode
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - cluster
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Classpath: 
/opt/flink/lib/flink-metrics-prometheus-1.7.1.jar:/opt/flink/lib/flink-python_2.11-1.7.1.jar:/opt/flink/lib/flink-queryable-state-runtime_2.11-1.7.1.jar:/opt/flink/lib/flink-table_2.11-1.7.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.7.1.jar:::
2019-02-21 21:00:37,806 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 

2019-02-21 21:00:37,808 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Registered UNIX 
signal handlers for [TERM, HUP, INT]
2019-02-21 21:00:37,822 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rest.address, 
crabby-kudu-fdp-flink-jobmanager-service
2019-02-21 21:00:37,822 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2019-02-21 21:00:37,823 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2019-02-21 21:00:37,823 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2019-02-21 21:00:37,823 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2019-02-21 21:00:37,823 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2019-02-21 21:00:37,824 INFO  

Re: FLIP-16, FLIP-15 Status Updates?

2019-02-21 Thread Paris Carbone
I created these FLIPs a while back, sorry for being late to this discussion but 
I can try to elaborate.

The idea from FLIP-16 is proven to be correct [1] (see chapter 3) and I think 
it is the only way to go but I have been in favour of providing channel 
implementations with checkpoint behaviour built in.
I do have some pretty outdated Flink forks with these patches resolved but it 
is not an elegant implementation. If I am not mistaken people looked into 
checkpointing in-transit channel state in Blink and that is a good use case to 
(re)use that part of the logic imho.

Regarding FLIP-15 and beyond there are a few things we have to agree before 
stream iterations are finalised.
We have looked a LOT into this ([1] see chapter 6) also together with Vasia 
(CC) and discussed with other committers to allow for a BSP iterative model on 
windows and extensions to dynamic graphs, ML etc.
Some of these ideas were also planned to be integrated in Blink but I think 
this is still in the prototyping phase at Alibaba, please correct me if I am 
wrong.
I am very happy there is finally some interest in this! The FLIP discussions 
were dead for years. Maybe it is time to start planning it properly together, 
it is a super cool feature ;)

cheers
Paris

[1] http://kth.diva-portal.org/smash/get/diva2:1240814/FULLTEXT01.pdf



On 21 Feb 2019, at 19:17, Stephan Ewen 
mailto:se...@apache.org>> wrote:

Hi John!

I know some committers are working on iterations, but on a bigger update. That 
might subsume the FLIPs 15 and 16 eventually.
I believe they will share some part of that soon (in a few weeks).

Best,
Stephan


On Tue, Feb 19, 2019 at 5:45 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi Timo,

That’s great, thank you very much. If I’d like to contribute, is it best to 
wait until the roadmap has been published? And is this the best list to ask on, 
or is the development mailing list better?

Many thanks,

John

Sent from my iPhone

> On 19 Feb 2019, at 16:29, Timo Walther 
> mailto:twal...@apache.org>> wrote:
>
> Hi John,
>
> you are right that there was not much progress in the last years around these 
> two FLIPs. Mostly due to shift of priorities. However, with the big Blink 
> code contribution from Alibaba and joint development forces for a unified 
> batch and streaming runtime [1], it is very likely that also iterations and 
> thus machine learning algorithms will see more development efforts.
>
> The community is working on roadmap page for the website. And I can already 
> reveal that a new iterations model is mentioned there. The new Flink roadmap 
> page can be expected in the next 2-3 weeks.
>
> I hope this information helps.
>
> Regards,
> Timo
>
> [1] 
> https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
>
>> Am 19.02.19 um 12:47 schrieb John Tipper:
>> Hi All,
>>
>> Does anyone know what the current status is for FLIP-16 (loop fault 
>> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work 
>> back in 2016, but it all seemed to stop and go quiet since about March 2017. 
>> I see iterations as offering very interesting capabilities for Flink, so it 
>> would be good to understand how we can get this moving again.
>>
>> Many thanks,
>>
>> John
>>
>> Sent from my iPhone
>
>



Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-21 Thread Rong Rong
Hi Stephan,

Yes. I completely agree. Jincheng & Jark gave some very valuable feedbacks
and suggestions and I think we can definitely move the conversation forward
to reach a more concrete doc first before we put in to the roadmap. Thanks
for reviewing it and driving the roadmap effort!

--
Rong

On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:

> Hi Rong Rong!
>
> I would add the security / kerberos threads to the roadmap. They seem to
> be advanced enough in the discussions so that there is clarity what will
> come.
>
> For the window operator with slicing, I would personally like to see the
> discussion advance and have some more clarity and consensus on the feature
> before adding it to the roadmap. Not having that in the first version of
> the roadmap does not mean there will be no activity. And when the
> discussion advances well in the next weeks, we can update the roadmap soon.
>
> What do you think?
>
> Best,
> Stephan
>
>
> On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:
>
>> Hi Stephan,
>>
>> Thanks for the clarification, yes I think these issues has already been
>> discussed in previous mailing list threads [1,2,3].
>>
>> I also agree that updating the "official" roadmap every release is a very
>> good idea to avoid frequent update.
>> One question I might've been a bit confusion is: are we suggesting to
>> keep one roadmap on the documentation site (e.g. [4]) per release, or
>> simply just one most up-to-date roadmap in the main website [5] ?
>> Just like the release notes in every release, the former will probably
>> provide a good tracker for users to look back at previous roadmaps as well
>> I am assuming.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
>> [5] https://flink.apache.org/
>>
>> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>>
>>> I think the website is better as well.
>>>
>>> I agree with Fabian that the wiki is not so visible, and visibility is
>>> the main motivation.
>>> This type of roadmap overview would not be updated by everyone - letting
>>> committers update the roadmap means the listed threads are actually
>>> happening at the moment.
>>>
>>>
>>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>>> wrote:
>>>
 Hi,

 I like the idea of putting the roadmap on the website because it is
 much more visible (and IMO more credible, obligatory) there.
 However, I share the concerns about frequent updates.

 It think it would be great to update the "official" roadmap on the
 website once per release (-bugfix releases), i.e., every three month.
 We can use the wiki to collect and draft the roadmap for the next
 update.

 Best, Fabian


 Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang >>> >:

> Hi Stephan,
>
> Thanks for this proposal. It is a good idea to track the roadmap. One
> suggestion is that it might be better to put it into wiki page first.
> Because it is easier to update the roadmap on wiki compared to on flink 
> web
> site. And I guess we may need to update the roadmap very often at the
> beginning as there's so many discussions and proposals in community
> recently. We can move it into flink web site later when we feel it could 
> be
> nailed down.
>
> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>
>> Thanks Jincheng and Rong Rong!
>>
>> I am not deciding a roadmap and making a call on what features should
>> be developed or not. I was only collecting broader issues that are 
>> already
>> happening or have an active FLIP/design discussion plus committer 
>> support.
>>
>> Do we have that for the suggested issues as well? If yes , we can add
>> them (can you point me to the issue/mail-thread), if not, let's try and
>> move the discussion forward and add them to the roadmap overview then.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong 
>> wrote:
>>
>>> Thanks Stephan for the great proposal.
>>>
>>> This would not only be beneficial for new users but also for
>>> contributors to keep track on all upcoming features.
>>>
>>> I think that better window operator support can also be separately
>>> group into its own category, as they affects both future DataStream API 
>>> and
>>> batch stream unification.
>>> can we also include:
>>> - OVER aggregate for DataStream API separately as @jincheng
>>> suggested.
>>> - Improving sliding window operator [1]
>>>

Is there a Flink DataSet equivalent to Spark's RDD.persist?

2019-02-21 Thread Frank Grimes
Hi,
I'm trying to port an existing Spark job to Flink and have gotten stuck on the 
same issue brought up here:
https://stackoverflow.com/questions/46243181/cache-and-persist-datasets
Is there some way to accomplish this same thing in Flink?i.e. avoid 
re-computing a particular DataSet when multiple different subsequent 
transformations are required on it.
I've even tried explicitly writing out the DataSet to avoid the re-computation 
but still taking an I/O hit for the initial write to HDFS and subsequent 
re-reading of it in the following stages. While it does yield a performance 
improvement over no caching at all, it doesn't match the performance I get with 
RDD.persist in Spark.
Thanks,
Frank Grimes

Why don't Tuple types implement Comparable?

2019-02-21 Thread Frank Grimes
Hi,
I've recently started to evaluate Flink and have found it odd that its Tuple 
types, while Serializable, don't implement java.lang.Comparable.This means that 
I either need to provide an KeySelector for many operations or subtype the 
Tuple types and provide my own implementation of compareTo for each.
Is there a technical reason why this was omitted?
For example, the JOOQ/JOOL Tuple types all implement 
Comparable:https://github.com/jOOQ/jOOL/blob/master/jOOL-java-8/src/main/java/org/jooq/lambda/tuple/Tuple2.java#L39
As an aside, I tried replacing usage of Flink's Tuple types with the JOOL ones 
but they caused a StackOverflowError similar to this 
one:https://issues.apache.org/jira/browse/FLINK-3922

Thanks,
Frank Grimes


Re: FLIP-16, FLIP-15 Status Updates?

2019-02-21 Thread Stephan Ewen
Hi John!

I know some committers are working on iterations, but on a bigger update.
That might subsume the FLIPs 15 and 16 eventually.
I believe they will share some part of that soon (in a few weeks).

Best,
Stephan


On Tue, Feb 19, 2019 at 5:45 PM John Tipper  wrote:

> Hi Timo,
>
> That’s great, thank you very much. If I’d like to contribute, is it best
> to wait until the roadmap has been published? And is this the best list to
> ask on, or is the development mailing list better?
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> > On 19 Feb 2019, at 16:29, Timo Walther  wrote:
> >
> > Hi John,
> >
> > you are right that there was not much progress in the last years around
> these two FLIPs. Mostly due to shift of priorities. However, with the big
> Blink code contribution from Alibaba and joint development forces for a
> unified batch and streaming runtime [1], it is very likely that also
> iterations and thus machine learning algorithms will see more development
> efforts.
> >
> > The community is working on roadmap page for the website. And I can
> already reveal that a new iterations model is mentioned there. The new
> Flink roadmap page can be expected in the next 2-3 weeks.
> >
> > I hope this information helps.
> >
> > Regards,
> > Timo
> >
> > [1]
> https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
> >
> >> Am 19.02.19 um 12:47 schrieb John Tipper:
> >> Hi All,
> >>
> >> Does anyone know what the current status is for FLIP-16 (loop fault
> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work
> back in 2016, but it all seemed to stop and go quiet since about March
> 2017. I see iterations as offering very interesting capabilities for Flink,
> so it would be good to understand how we can get this moving again.
> >>
> >> Many thanks,
> >>
> >> John
> >>
> >> Sent from my iPhone
> >
> >
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-21 Thread Stephan Ewen
Hi Shaoxuan!

I think adding the web UI improvements makes sense - there is not much open
to discuss there. Will do that.

For the machine learning improvements - that is a pretty big piece and I
think the discussions are still ongoing. I would prefer this to advance a
bit before adding it to the roadmap. The way I proposed the roadmap, it was
meant to reflect the ongoing features where we have consensus on what it
should roughly look like.
We can update the roadmap very soon, once the machine learning discussion
has advanced a bit and has reached the state of a FLIP or so.

What do you think?

Best,
Stephan

On Mon, Feb 18, 2019 at 4:31 PM Shaoxuan Wang  wrote:

> Hi Stephan,
>
> Thanks for summarizing the work into a roadmap. It really
> helps users to understand where Flink will forward to. The entire outline
> looks good to me. If appropriate, I would recommend to add another two
> attracting categories in the roadmap.
>
> *Flink ML Enhancement*
>   - Refactor ML pipeline on TableAPI
>   - Python support for TableAPI
>   - Support streaming training & inference.
>   - Seamless integration of DL engines (Tensorflow, PyTorch etc)
>   - ML platform with a group of AI tooling
> Some of these work have already been discussed in the dev mail list.
> Related JIRA (FLINK-11095) and discussion:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
> ;
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Python-and-Non-JVM-Language-Support-in-Flink-td25905.html
>
>
> *Flink-Runtime-Web Improvement*
>   - Much of this comes via Blink
>   - Refactor the entire module to use latest Angular (7.x)
>   - Add resource information at three levels including Cluster,
> TaskManager and Job
>   - Add operator level topology and and data flow tracing
>   - Add new metrics to track the back pressure, filter and data skew
>   - Add log association to Job, Vertex and SubTasks
> Related JIRA (FLINK-10705) and discussion:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Change-underlying-Frontend-Architecture-for-Flink-Web-Dashboard-td24902.html
>
>
> What do you think?
>
> Regards,
> Shaoxuan
>
>
>
> On Wed, Feb 13, 2019 at 7:21 PM Stephan Ewen  wrote:
>
>> Hi all!
>>
>> Recently several contributors, committers, and users asked about making
>> it more visible in which way the project is currently going.
>>
>> Users and developers can track the direction by following the discussion
>> threads and JIRA, but due to the mass of discussions and open issues, it is
>> very hard to get a good overall picture.
>> Especially for new users and contributors, is is very hard to get a quick
>> overview of the project direction.
>>
>> To fix this, I suggest to add a brief roadmap summary to the homepage. It
>> is a bit of a commitment to keep that roadmap up to date, but I think the
>> benefit for users justifies that.
>> The Apache Beam project has added such a roadmap [1]
>> , which was received very well by the
>> community, I would suggest to follow a similar structure here.
>>
>> If the community is in favor of this, I would volunteer to write a first
>> version of such a roadmap. The points I would include are below.
>>
>> Best,
>> Stephan
>>
>> [1] https://beam.apache.org/roadmap/
>>
>> 
>>
>> Disclaimer: Apache Flink is not governed or steered by any one single
>> entity, but by its community and Project Management Committee (PMC). This
>> is not a authoritative roadmap in the sense of a plan with a specific
>> timeline. Instead, we share our vision for the future and major initiatives
>> that are receiving attention and give users and contributors an
>> understanding what they can look forward to.
>>
>> *Future Role of Table API and DataStream API*
>>   - Table API becomes first class citizen
>>   - Table API becomes primary API for analytics use cases
>>   * Declarative, automatic optimizations
>>   * No manual control over state and timers
>>   - DataStream API becomes primary API for applications and data pipeline
>> use cases
>>   * Physical, user controls data types, no magic or optimizer
>>   * Explicit control over state and time
>>
>> *Batch Streaming Unification*
>>   - Table API unification (environments) (FLIP-32)
>>   - New unified source interface (FLIP-27)
>>   - Runtime operator unification & code reuse between DataStream / Table
>>   - Extending Table API to make it convenient API for all analytical use
>> cases (easier mix in of UDFs)
>>   - Same join operators on bounded/unbounded Table API and DataStream API
>>
>> *Faster Batch (Bounded Streams)*
>>   - Much of this comes via Blink contribution/merging
>>   - Fine-grained Fault Tolerance on bounded data (Table API)
>>   - Batch Scheduling on bounded data (Table API)
>>   - External Shuffle Services Support on bounded streams
>>   - Caching of 

Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-21 Thread Stephan Ewen
Hi Rong Rong!

I would add the security / kerberos threads to the roadmap. They seem to be
advanced enough in the discussions so that there is clarity what will come.

For the window operator with slicing, I would personally like to see the
discussion advance and have some more clarity and consensus on the feature
before adding it to the roadmap. Not having that in the first version of
the roadmap does not mean there will be no activity. And when the
discussion advances well in the next weeks, we can update the roadmap soon.

What do you think?

Best,
Stephan


On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:

> Hi Stephan,
>
> Thanks for the clarification, yes I think these issues has already been
> discussed in previous mailing list threads [1,2,3].
>
> I also agree that updating the "official" roadmap every release is a very
> good idea to avoid frequent update.
> One question I might've been a bit confusion is: are we suggesting to keep
> one roadmap on the documentation site (e.g. [4]) per release, or simply
> just one most up-to-date roadmap in the main website [5] ?
> Just like the release notes in every release, the former will probably
> provide a good tracker for users to look back at previous roadmaps as well
> I am assuming.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
> [5] https://flink.apache.org/
>
> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>
>> I think the website is better as well.
>>
>> I agree with Fabian that the wiki is not so visible, and visibility is
>> the main motivation.
>> This type of roadmap overview would not be updated by everyone - letting
>> committers update the roadmap means the listed threads are actually
>> happening at the moment.
>>
>>
>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> I like the idea of putting the roadmap on the website because it is much
>>> more visible (and IMO more credible, obligatory) there.
>>> However, I share the concerns about frequent updates.
>>>
>>> It think it would be great to update the "official" roadmap on the
>>> website once per release (-bugfix releases), i.e., every three month.
>>> We can use the wiki to collect and draft the roadmap for the next
>>> update.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang >> >:
>>>
 Hi Stephan,

 Thanks for this proposal. It is a good idea to track the roadmap. One
 suggestion is that it might be better to put it into wiki page first.
 Because it is easier to update the roadmap on wiki compared to on flink web
 site. And I guess we may need to update the roadmap very often at the
 beginning as there's so many discussions and proposals in community
 recently. We can move it into flink web site later when we feel it could be
 nailed down.

 Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should
> be developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add
> them (can you point me to the issue/mail-thread), if not, let's try and
> move the discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for
>> contributors to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately
>> group into its own category, as they affects both future DataStream API 
>> and
>> batch stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is 
>> needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> 

Re: Can I make an Elasticsearch Sink effectively exactly once?

2019-02-21 Thread Dawid Wysakowicz
Hi,

The procedure you described will not give you exactly once semantics.
What the cited excerpt means is that a checkpoint will not be considered
finished until pending requests are acknowledged. It does not mean that
those requests are stored on the flink side. That said if an error
occurs before those requests are acknowledged. The job will be recovered
from previous successful checkpoint. Nevertheless some of the requests
that "belong" to current checkpoint might have been sent to ES at this
time, that's where the "at-least-once" delivery comes from.

If you do have a deterministic way of generating ElasticseachId this
semantic should be enough for you though. Any duplicates(by the id)
should be updated on the ES side.

Best,

Dawid

On 21/02/2019 14:26, Stephen Connolly wrote:
> From how I understand it:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance
>
> the Flink Elasticsearch Sink guarantees at-least-once delivery of
> action requests to Elasticsearch clusters. It does so by waiting
> for all pending action requests in the BulkProcessor at the time
> of checkpoints. This effectively assures that all requests before
> the checkpoint was triggered have been successfully acknowledged
> by Elasticsearch, before proceeding to process more records sent
> to the sink.
>
>
> So I am thinking:
>
>   * If I put a .map(json -> json.set("_id",
> ElasticsearchId.generate()) in front of the Elasticsearch sink
>   * If I have a ActionRequestFailureHandler that drops any ID
> conflicts on the floor
>
> Would this give me exactly once output to Elasticsearch as the
> BulkProcessor's checkpoint would include the "_id" and thus in the
> event of a recovery the duplicates would be detected.
>
> Or is there some additional concern I need to be aware of?
>
> Thanks
>
> -stephenc


signature.asc
Description: OpenPGP digital signature


Re: Starting Flink cluster and running a job

2019-02-21 Thread Boris Lublinsky
The relevant dependencies are 
val flinkScala=  "org.apache.flink" %%   
"flink-scala"% flinkVersion % "provided"
val flinkStreamingScala   =  "org.apache.flink" %%   
"flink-streaming-scala"  % flinkVersion % "provided"
val flinkKafka=  "org.apache.flink" %%   
"flink-connector-kafka"  % flinkVersion exclude("org.slf4j", 
"slf4j-log4j12") 
I am using SBT
I tried both connector-kafka and connector-kaka-11 - same result 


Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Feb 21, 2019, at 1:38 AM, Konstantin Knauf  
> wrote:
> 
> Hi Boris, 
> 
> can you the relevant parts (dependencies) of your pom.xml? Did you also try 
> without fixing the Kafka version, i.e. running with the Kafka client version 
> provided by the Kafka connector of Flink? Gordon (cc) dealt with FLINK-8741. 
> 
> @Gordon: have you seen this issue with 1.6/1.7 before?
> 
> Cheers, 
> 
> Konstantin
> 
> On Thu, Feb 21, 2019 at 2:19 AM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> I found some more details on this
> The same error for the same application was reported about a year ago 
> http://mail-archives.apache.org/mod_mbox/flink-user/201802.mbox/%3CCAE7GCT4pF74LwyY=tivzhquq50tkjjawfhaw+5phcsx+vos...@mail.gmail.com%3E
>  
> 
> And was due to https://issues.apache.org/jira/browse/FLINK-8741 
> 
> 
> It looks like the same issue is back in 1.7.1 and 1.6.3. I tried with both 
> latest kaffka-connector
> And Kafka-connector-011
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
>> On Feb 19, 2019, at 7:02 PM, Ken Krugler > > wrote:
>> 
>> Hi Boris,
>> 
>> I haven’t seen this exact error, but I have seen similar errors caused by 
>> multiple versions of jars on the classpath.
>> 
>> When I’ve run into this particular "XXX is not an instance of YYY" problem, 
>> it often seems to be caused by a jar that I should have marked as provided 
>> in my pom.
>> 
>> Though I’m typically running on a YARN cluster, not w/K8s, so maybe this 
>> doesn’t apply.
>> 
>> — Ken
>> 
>> PS - I assume you’ve been reading 
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
>>  
>> 
>> 
>> 
>>> On Feb 19, 2019, at 4:34 PM, Boris Lublinsky >> > wrote:
>>> 
>>> Konstantin,
>>> After experimenting with this for a while, I got to the root cause of the 
>>> problem
>>> I am running a version of a Taxi ride travel prediction as my sample.
>>> It works fine in Intellij,
>>> But when I am trying to put it in the docker (standard Debian 1.7 image)
>>> It fails with a following error
>>> 
>>> 
>>> The program finished with the following exception:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
>>> (JobID: 9340e7669e7344ab827fef4ddb5ba73d)
>>> at 
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>>> at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>>> at 
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>> at 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>> at 
>>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction$.main(TravelTimePrediction.scala:89)
>>> at 
>>> com.lightbend.fdp.sample.flink.app.TravelTimePrediction.main(TravelTimePrediction.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>> at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> at 
>>> 

Re: Submitting job to Flink on yarn timesout on flip-6 1.5.x

2019-02-21 Thread Gary Yao
Hi,

Beginning with Flink 1.7, you cannot use the legacy mode anymore [1][2]. I
am
currently working on removing references to the legacy mode in the
documentation [3]. Is there any reason, you cannot use the "new mode"?

Best,
Gary

[1] https://flink.apache.org/news/2018/11/30/release-1.7.0.html
[2] https://issues.apache.org/jira/browse/FLINK-10392
[3] https://issues.apache.org/jira/browse/FLINK-11713

On Mon, Feb 18, 2019 at 12:00 PM Richard Deurwaarder 
wrote:

> Hello,
>
> I am trying to upgrade our job from flink 1.4.2 to 1.7.1 but I keep
> running into timeouts after submitting the job.
>
> The flink job runs on our hadoop cluster and starts using Yarn.
>
> Relevant config options seem to be:
>
> jobmanager.rpc.port: 55501
>
> recovery.jobmanager.port: 55502
>
> yarn.application-master.port: 55503
>
> blob.server.port: 55504
>
>
> I've seen the following behavior:
>   - Using the same flink-conf.yaml as we used in 1.4.2: 1.5.6 / 1.6.3 /
> 1.7.1 all versions timeout while 1.4.2 works.
>   - Using 1.5.6 with "mode: legacy" (to switch off flip-6) works
>   - Using 1.7.1 with "mode: legacy" gives timeout (I assume this option
> was removed but the documentation is outdated?
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy
> )
>
> When the timeout happens I get the following stacktrace:
>
> INFO class java.time.Instant does not contain a getter for field seconds
> 2019-02-18T10:16:56.815+01:00
> INFO class com.bol.fin_hdp.cm1.domain.Cm1Transportable does not contain a
> getter for field globalId 2019-02-18T10:16:56.815+01:00
> INFO Submitting job 5af931bcef395a78b5af2b97e92dcffe (detached: false).
> 2019-02-18T10:16:57.182+01:00
> INFO 
> 2019-02-18T10:29:27.527+01:00
> INFO The program finished with the following exception:
> 2019-02-18T10:29:27.564+01:00
> INFO org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error. 2019-02-18T10:29:27.601+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
> 2019-02-18T10:29:27.638+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> 2019-02-18T10:29:27.675+01:00
> INFO at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> 2019-02-18T10:29:27.711+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
> 2019-02-18T10:29:27.747+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
> 2019-02-18T10:29:27.784+01:00
> INFO at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> 2019-02-18T10:29:27.820+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
> 2019-02-18T10:29:27.857+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
> 2019-02-18T10:29:27.893+01:00
> INFO at java.security.AccessController.doPrivileged(Native Method)
> 2019-02-18T10:29:27.929+01:00
> INFO at javax.security.auth.Subject.doAs(Subject.java:422)
> 2019-02-18T10:29:27.968+01:00
> INFO at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> 2019-02-18T10:29:28.004+01:00
> INFO at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 2019-02-18T10:29:28.040+01:00
> INFO at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
> 2019-02-18T10:29:28.075+01:00
> INFO Caused by: java.lang.RuntimeException:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. 2019-02-18T10:29:28.110+01:00
> INFO at
> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJob(IntervalJobStarter.java:43)
> 2019-02-18T10:29:28.146+01:00
> INFO at
> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJobWithConfig(IntervalJobStarter.java:32)
> 2019-02-18T10:29:28.182+01:00
> INFO at com.bol.fin_hdp.Main.main(Main.java:8)
> 2019-02-18T10:29:28.217+01:00
> INFO at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2019-02-18T10:29:28.253+01:00
> INFO at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2019-02-18T10:29:28.289+01:00
> INFO at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2019-02-18T10:29:28.325+01:00
> INFO at java.lang.reflect.Method.invoke(Method.java:498)
> 2019-02-18T10:29:28.363+01:00
> INFO at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 2019-02-18T10:29:28.400+01:00
> INFO ... 12 more 2019-02-18T10:29:28.436+01:00
> INFO Caused by:
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. 2019-02-18T10:29:28.473+01:00
> INFO at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
> 

Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
On Thu, 21 Feb 2019 at 14:00, Dawid Wysakowicz 
wrote:

> If an event arrived at WindowOperator before the Watermark, then it will
> be accounted for window aggregation and put in state. Once that state gets
> checkpointed this same event won't be processed again. In other words if a
> checkpoint succeeds elements that produced corresponding state won't be
> processed again. You may want to read this docs for further
> understanding[1].
>
> What I meant by reprocessing is when you want to reprocess the same input
> records. E.g. you want to rerun your job once again on data from a past
> week. This computation might result in different results than the original
> ones cause Watermarks might get generated after different elements as they
> are bound by "ProcessingTime".
>
Ahh that clarifies. Nope we are processing the stream of events Taylor
Swift style... a.k.a. "We are never processing them again, like ever".

The stream of events is too much data to retain it all. Kinesis will just
keep 1 day's history for recovery.

I'd read [1] before, but then when you mentioned "you might get different
results in case of reprocessing" I started to think that maybe the
Watermarks are the Barrier but after your clarification I'm back to
thinking they are separate similar mechanisms operating in the stream


> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html#checkpointing
> On 21/02/2019 14:42, Stephen Connolly wrote:
>
>
>
> On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz 
> wrote:
>
>> It is definitely a solution ;)
>>
>> You should be aware of the downsides though:
>>
>>- you might get different results in case of reprocessing
>>- you might drop some data as late, due to some delays in processing,
>>if the events arrive later then the "ProcessingTime" threshold
>>
>> So I have a separate stream processor from the "late" side of my window
> that works out what the update is.
>
> But I guess the question I have is around what happens with reprocessing.
>
> 1. Event 1 goes into the window aggregation because it is before the
> watermark
>
> 2. State gets checkpointed
>
> 3. Crash
>
> 4. Recover
>
> Will Event 1 now go to the late stream or will it be tagged as having been
> included into the state in the checkpoint.
>
> I don't mind if Event 1 gets included in the window's "create event count
> for timebox" output or the "update event count for timebox from late
> events" output as long as it is always one and only one of those paths.
>
>
>>
>>
>> Best,
>>
>> Dawid
>> On 21/02/2019 14:18, Stephen Connolly wrote:
>>
>> Yes, it was the "watermarks for event time when no events for that shard"
>> problem.
>>
>> I am now investigating whether we can use a blended watermark of
>> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
>> idle shards do not cause excessive data retention.
>>
>> Is that the best solution?
>>
>> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Stephen,
>>>
>>> Watermark for a single operator is the minimum of Watermarks received
>>> from all inputs, therefore if one of your shards/operators does not have
>>> incoming data it will not produce Watermarks thus the Watermark of
>>> WindowOperator will not progress. So this is sort of an expected behavior.
>>>
>>> I recommend reading the docs linked by Congxian, especially this
>>> section[1].
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>>
>>> Hmmm my suspicions are now quite high. I created a file source that just
>>> replays the events straight then I get more results
>>>
>>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
 Hmmm after expanding the dataset such that there was additional data
 that ended up on shard-0 (everything in my original dataset was
 coincidentally landing on shard-1) I am now getting output... should I
 expect this kind of behaviour if no data arrives at shard-0 ever?

 On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
 stephen.alan.conno...@gmail.com> wrote:

> Hi, I’m having a strange situation and I would like to know where I
> should start trying to debug.
>
> I have set up a configurable swap in source, with three
> implementations:
>
> 1. A mock implementation
> 2. A Kafka consumer implementation
> 3. A Kinesis consumer implementation
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
> I then have a window based on event time stamps… and from inspecting
> the aggregation function I can see that the data is getting aggregated…,
> I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
> that I can retrieve 

Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Dawid Wysakowicz
If an event arrived at WindowOperator before the Watermark, then it will
be accounted for window aggregation and put in state. Once that state
gets checkpointed this same event won't be processed again. In other
words if a checkpoint succeeds elements that produced corresponding
state won't be processed again. You may want to read this docs for
further understanding[1].

What I meant by reprocessing is when you want to reprocess the same
input records. E.g. you want to rerun your job once again on data from a
past week. This computation might result in different results than the
original ones cause Watermarks might get generated after different
elements as they are bound by "ProcessingTime".

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html#checkpointing

On 21/02/2019 14:42, Stephen Connolly wrote:
>
>
> On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz  > wrote:
>
> It is definitely a solution ;)
>
> You should be aware of the downsides though:
>
>   * you might get different results in case of reprocessing
>   * you might drop some data as late, due to some delays in
> processing, if the events arrive later then the
> "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my
> window that works out what the update is.
>
> But I guess the question I have is around what happens with reprocessing.
>
> 1. Event 1 goes into the window aggregation because it is before the
> watermark
>
> 2. State gets checkpointed
>
> 3. Crash
>
> 4. Recover
>
> Will Event 1 now go to the late stream or will it be tagged as having
> been included into the state in the checkpoint.
>
> I don't mind if Event 1 gets included in the window's "create event
> count for timebox" output or the "update event count for timebox from
> late events" output as long as it is always one and only one of those
> paths.
>  
>
> Best,
>
> Dawid
>
> On 21/02/2019 14:18, Stephen Connolly wrote:
>> Yes, it was the "watermarks for event time when no events for
>> that shard" problem.
>>
>> I am now investigating whether we can use a blended watermark of
>> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min)
>> to ensure idle shards do not cause excessive data retention.
>>
>> Is that the best solution?
>>
>> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz
>> mailto:dwysakow...@apache.org>> wrote:
>>
>> Hi Stephen,
>>
>> Watermark for a single operator is the minimum of Watermarks
>> received from all inputs, therefore if one of your
>> shards/operators does not have incoming data it will not
>> produce Watermarks thus the Watermark of WindowOperator will
>> not progress. So this is sort of an expected behavior.
>>
>> I recommend reading the docs linked by Congxian, especially
>> this section[1].
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>>
>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>> Hmmm my suspicions are now quite high. I created a file
>>> source that just replays the events straight then I get more
>>> results
>>>
>>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>>> >> > wrote:
>>>
>>> Hmmm after expanding the dataset such that there was
>>> additional data that ended up on shard-0 (everything in
>>> my original dataset was coincidentally landing on
>>> shard-1) I am now getting output... should I expect this
>>> kind of behaviour if no data arrives at shard-0 ever?
>>>
>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>>> >> > wrote:
>>>
>>> Hi, I’m having a strange situation and I would like
>>> to know where I should start trying to debug.
>>>
>>> I have set up a configurable swap in source, with
>>> three implementations:
>>>
>>> 1. A mock implementation
>>> 2. A Kafka consumer implementation
>>> 3. A Kinesis consumer implementation
>>>
>>> From injecting a log and no-op map function I can
>>> see that all three sources pass through the events
>>> correctly.
>>>
>>> I then have a window based on event time stamps… and
>>> from inspecting the aggregation function I can see
>>> that the data is getting aggregated…, I’m using the
>>> `.aggregate(AggregateFunction.WindowFunction)`
>>> variant so that I can retrieve the key
>>>
>>> 

Docker using flink socketwordcount example

2019-02-21 Thread Samet Yılmaz
I had a question about a topic. but I could not find a solution. Could you
help?
My question:
https://stackoverflow.com/questions/54806830/docker-using-flink-socketwordcount-example-apache-flink



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Standalone cluster - production settings

2019-02-21 Thread Hung
/ Each job has 3 asynch operators 
with Executors with thread counts of 20,20,100/

Flink handles parallelisms for you. If you want a higher parallelism of a
operator, you can call setParallelism()
for example,

flatMap(new Mapper1()).setParallelism(20)
flatMap(new Mapper2()).setParallelism(20)
flatMap(new Mapper3()).setParallelism(100)

You can check the official document here
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/parallel.html#setting-the-parallelism

/Currently we are using parallelism = 1/
I guess you set the job level parallelism

I would suggest you replace Executors with the use of Flink parallelisms. It
would be more efficient so 
you don't create the other thread pool although you already have one that
flink provides you(I maybe not right describing this concept)

Cheers,

Sendoh





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reduce one event under multiple keys

2019-02-21 Thread Stephen Connolly
Thanks!

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske  wrote:

> Hi Stephen,
>
> Sorry for the late response.
> If you don't need to match open and close events, your approach of using a
> flatMap to fan-out for the hierarchical folder structure and a window
> operator (or two for open and close) for counting and aggregating should be
> a good design.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:
>>
>>> Hi Stephen,
>>>
>>> A window is created with the first record that is assigned to it.
>>> If the windows are based on time and a key, than no window will be
>>> created (and not space be occupied) if there is not a first record for a
>>> key and time interval.
>>>
>>> Anyway, if tracking the number of open files & average opening time is
>>> your use case, you might want to implement the logic with a ProcessFunction
>>> instead of a window.
>>> The reason is that it is that time windows don't share state, i.e., the
>>> information about an opened but not yet closed file would not be "carried
>>> over" to the next window.
>>> However, if you use a ProcessFunction, you are responsible for cleaning
>>> up the state.
>>>
>>
>> Ahh but I am cheating by ensuring the events are rich enough that I do
>> not need to match them.
>>
>> I get the "open" (they are not really "open" events - I have mapped to an
>> analogy... it might be more like a build job start events... or not... I'm
>> not at liberty to say ;-) ) events because I need to count the number of
>> "open"s per time period.
>>
>> I get the "close" events and they include the duration plus other
>> information that can then be transformed into the required metrics... yes I
>> could derive the "open" from the "close" by subtracting the duration but:
>>
>> 1. they would cross window boundaries quite often, leading to repeated
>> fetch-update-write operations on the backing data store
>> 2. they wouldn't be as "live" and one of the things we need to know is
>> how many "open"s there are in the previous window... given some durations
>> can be many days, waiting for the "close" event to create the "open" metric
>> would not be a good plan.
>>
>> Basically, I am pushing some of the calculations to the edge where there
>> is state that makes those calculations cheap and then the rich events are
>> *hopefully* easy to aggregate with just simple aggregation functions that
>> only need to maintain the running total... at least that's what the PoC I
>> am experimenting with Flink should show
>>
>>
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>>> stephen.alan.conno...@gmail.com>:
>>>


 On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
 wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of
> an event) is still present in window x+1? If so, then the answer is no; a
> key will only be present in a given window if an event was received that
> fits into that window.
>

 To confirm:

 So let's say I'l tracking the average time a file is opened in folders.

 In window N we get the events:

 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

 {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
 guide.txt"}
 {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
 guide.txt"}

 So there will be aggregates stored for
 ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
 ("ca:fe:ba:be","/foo/bar/README.txt"), etc

 In window N+1 we do not get any events at all.

 So the memory used by my aggregation functions from window N will be
 freed and the storage will be effectively zero (modulo any follow on
 processing that might be on a longer window)

 This seems to be what you are saying... in which case my naïeve hope
 was not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar
> to most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how
> long they are open for.
>
> I need for every X minute tumbling window 

Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz 
wrote:

> It is definitely a solution ;)
>
> You should be aware of the downsides though:
>
>- you might get different results in case of reprocessing
>- you might drop some data as late, due to some delays in processing,
>if the events arrive later then the "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my window
that works out what the update is.

But I guess the question I have is around what happens with reprocessing.

1. Event 1 goes into the window aggregation because it is before the
watermark

2. State gets checkpointed

3. Crash

4. Recover

Will Event 1 now go to the late stream or will it be tagged as having been
included into the state in the checkpoint.

I don't mind if Event 1 gets included in the window's "create event count
for timebox" output or the "update event count for timebox from late
events" output as long as it is always one and only one of those paths.


>
>
> Best,
>
> Dawid
> On 21/02/2019 14:18, Stephen Connolly wrote:
>
> Yes, it was the "watermarks for event time when no events for that shard"
> problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
> idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
> wrote:
>
>> Hi Stephen,
>>
>> Watermark for a single operator is the minimum of Watermarks received
>> from all inputs, therefore if one of your shards/operators does not have
>> incoming data it will not produce Watermarks thus the Watermark of
>> WindowOperator will not progress. So this is sort of an expected behavior.
>>
>> I recommend reading the docs linked by Congxian, especially this
>> section[1].
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>
>> Hmmm my suspicions are now quite high. I created a file source that just
>> replays the events straight then I get more results
>>
>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hmmm after expanding the dataset such that there was additional data
>>> that ended up on shard-0 (everything in my original dataset was
>>> coincidentally landing on shard-1) I am now getting output... should I
>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>
>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
 Hi, I’m having a strange situation and I would like to know where I
 should start trying to debug.

 I have set up a configurable swap in source, with three implementations:

 1. A mock implementation
 2. A Kafka consumer implementation
 3. A Kinesis consumer implementation

 From injecting a log and no-op map function I can see that all three
 sources pass through the events correctly.

 I then have a window based on event time stamps… and from inspecting
 the aggregation function I can see that the data is getting aggregated…,
 I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
 that I can retrieve the key

 Here’s the strange thing, I only change the source (and each source
 uses the same deserialization function) but:


- When I use either Kafka or my Mock source, the WindowFunction
gets called as events pass the end of the window
- When I use the Kinesis source, however, the window function never
gets called. I have even tried injecting events into kinesis with really
high timestamps to flush the watermarks in my
BoundedOutOfOrdernessTimestampExtractor... but nothing

 I cannot see how this source switching could result in such a different
 behaviour:

 Properties sourceProperties = new Properties();
 ConsumerFactory sourceFactory;
 String sourceName = configParams.getRequired("source");
 switch (sourceName.toLowerCase(Locale.ENGLISH)) {
 case "kinesis":
 sourceFactory = FlinkKinesisConsumer::new;
 copyOptionalArg(configParams, "aws-region",
 sourceProperties, AWSConfigConstants.AWS_REGION);
 copyOptionalArg(configParams, "aws-endpoint",
 sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
 copyOptionalArg(configParams, "aws-access-key",
 sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
 copyOptionalArg(configParams, "aws-secret-key",
 sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
 copyOptionalArg(configParams, "aws-profile",
 sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);

Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Dawid Wysakowicz
It is definitely a solution ;)

You should be aware of the downsides though:

  * you might get different results in case of reprocessing
  * you might drop some data as late, due to some delays in processing,
if the events arrive later then the "ProcessingTime" threshold

Best,

Dawid

On 21/02/2019 14:18, Stephen Connolly wrote:
> Yes, it was the "watermarks for event time when no events for that
> shard" problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to
> ensure idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz  > wrote:
>
> Hi Stephen,
>
> Watermark for a single operator is the minimum of Watermarks
> received from all inputs, therefore if one of your
> shards/operators does not have incoming data it will not produce
> Watermarks thus the Watermark of WindowOperator will not progress.
> So this is sort of an expected behavior.
>
> I recommend reading the docs linked by Congxian, especially this
> section[1].
>
> Best,
>
> Dawid
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>
> On 19/02/2019 14:31, Stephen Connolly wrote:
>> Hmmm my suspicions are now quite high. I created a file source
>> that just replays the events straight then I get more results
>>
>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>> > > wrote:
>>
>> Hmmm after expanding the dataset such that there was
>> additional data that ended up on shard-0 (everything in my
>> original dataset was coincidentally landing on shard-1) I am
>> now getting output... should I expect this kind of behaviour
>> if no data arrives at shard-0 ever?
>>
>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>> > > wrote:
>>
>> Hi, I’m having a strange situation and I would like to
>> know where I should start trying to debug.
>>
>> I have set up a configurable swap in source, with three
>> implementations:
>>
>> 1. A mock implementation
>> 2. A Kafka consumer implementation
>> 3. A Kinesis consumer implementation
>>
>> From injecting a log and no-op map function I can see
>> that all three sources pass through the events correctly.
>>
>> I then have a window based on event time stamps… and from
>> inspecting the aggregation function I can see that the
>> data is getting aggregated…, I’m using the
>> `.aggregate(AggregateFunction.WindowFunction)` variant so
>> that I can retrieve the key
>>
>> Here’s the strange thing, I only change the source (and
>> each source uses the same deserialization function) but:
>>
>>   * When I use either Kafka or my Mock source, the
>> WindowFunction gets called as events pass the end of
>> the window
>>   * When I use the Kinesis source, however, the window
>> function never gets called. I have even tried
>> injecting events into kinesis with really high
>> timestamps to flush the watermarks in my
>> BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>> I cannot see how this source switching could result in
>> such a different behaviour:
>>
>>         Properties sourceProperties = new Properties();
>>         ConsumerFactory sourceFactory;
>>         String sourceName =
>> configParams.getRequired("source");
>>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>             case "kinesis":
>>                 sourceFactory = FlinkKinesisConsumer::new;
>>                 copyOptionalArg(configParams,
>> "aws-region", sourceProperties,
>> AWSConfigConstants.AWS_REGION);
>>                 copyOptionalArg(configParams,
>> "aws-endpoint", sourceProperties,
>> AWSConfigConstants.AWS_ENDPOINT);
>>                 copyOptionalArg(configParams,
>> "aws-access-key", sourceProperties,
>> AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>                 copyOptionalArg(configParams,
>> "aws-secret-key", sourceProperties,
>> AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>                 copyOptionalArg(configParams,
>> "aws-profile", sourceProperties,
>> AWSConfigConstants.AWS_PROFILE_NAME);
>>                 break;
>>   

Can I make an Elasticsearch Sink effectively exactly once?

2019-02-21 Thread Stephen Connolly
>From how I understand it:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance

the Flink Elasticsearch Sink guarantees at-least-once delivery of action
> requests to Elasticsearch clusters. It does so by waiting for all pending
> action requests in the BulkProcessor at the time of checkpoints. This
> effectively assures that all requests before the checkpoint was triggered
> have been successfully acknowledged by Elasticsearch, before proceeding to
> process more records sent to the sink.


So I am thinking:


   - If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in
   front of the Elasticsearch sink
   - If I have a ActionRequestFailureHandler that drops any ID conflicts on
   the floor

Would this give me exactly once output to Elasticsearch as the
BulkProcessor's checkpoint would include the "_id" and thus in the event of
a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc


Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
Yes, it was the "watermarks for event time when no events for that shard"
problem.

I am now investigating whether we can use a blended watermark of
max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
idle shards do not cause excessive data retention.

Is that the best solution?

On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
wrote:

> Hi Stephen,
>
> Watermark for a single operator is the minimum of Watermarks received from
> all inputs, therefore if one of your shards/operators does not have
> incoming data it will not produce Watermarks thus the Watermark of
> WindowOperator will not progress. So this is sort of an expected behavior.
>
> I recommend reading the docs linked by Congxian, especially this
> section[1].
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
> On 19/02/2019 14:31, Stephen Connolly wrote:
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hmmm after expanding the dataset such that there was additional data that
>> ended up on shard-0 (everything in my original dataset was coincidentally
>> landing on shard-1) I am now getting output... should I expect this kind of
>> behaviour if no data arrives at shard-0 ever?
>>
>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hi, I’m having a strange situation and I would like to know where I
>>> should start trying to debug.
>>>
>>> I have set up a configurable swap in source, with three implementations:
>>>
>>> 1. A mock implementation
>>> 2. A Kafka consumer implementation
>>> 3. A Kinesis consumer implementation
>>>
>>> From injecting a log and no-op map function I can see that all three
>>> sources pass through the events correctly.
>>>
>>> I then have a window based on event time stamps… and from inspecting the
>>> aggregation function I can see that the data is getting aggregated…, I’m
>>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>>> can retrieve the key
>>>
>>> Here’s the strange thing, I only change the source (and each source uses
>>> the same deserialization function) but:
>>>
>>>
>>>- When I use either Kafka or my Mock source, the WindowFunction gets
>>>called as events pass the end of the window
>>>- When I use the Kinesis source, however, the window function never
>>>gets called. I have even tried injecting events into kinesis with really
>>>high timestamps to flush the watermarks in my
>>>BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>
>>> I cannot see how this source switching could result in such a different
>>> behaviour:
>>>
>>> Properties sourceProperties = new Properties();
>>> ConsumerFactory sourceFactory;
>>> String sourceName = configParams.getRequired("source");
>>> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>> case "kinesis":
>>> sourceFactory = FlinkKinesisConsumer::new;
>>> copyOptionalArg(configParams, "aws-region",
>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>> copyOptionalArg(configParams, "aws-endpoint",
>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>> copyOptionalArg(configParams, "aws-access-key",
>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>> copyOptionalArg(configParams, "aws-secret-key",
>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>> copyOptionalArg(configParams, "aws-profile",
>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>> break;
>>> case "kafka":
>>> sourceFactory = FlinkKafkaConsumer010::new;
>>> copyRequiredArg(configParams, "bootstrap-server",
>>> sourceProperties, "bootstrap.servers");
>>> copyOptionalArg(configParams, "group-id",
>>> sourceProperties, "group.id");
>>> break;
>>> case "mock":
>>> sourceFactory = MockSourceFunction::new;
>>> break;
>>> default:
>>> throw new RuntimeException("Unknown source '" +
>>> sourceName + '\'');
>>> }
>>>
>>> // set up the streaming execution environment
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> // poll watermark every second because using
>>> BoundedOutOfOrdernessTimestampExtractor
>>> env.getConfig().setAutoWatermarkInterval(1000L);
>>> env.enableCheckpointing(5000);
>>>
>>> SplitStream eventsByType =
>>> env.addSource(sourceFactory.create(
>>> configParams.getRequired("topic"),
>>> new 

Re: Broadcast state before events stream consumption

2019-02-21 Thread Dawid Wysakowicz
Hi Averell,

BroadcastState is a special case of OperatorState. Operator state is
always kept in-memory at runtime (must fit into memory), no matter what
state backend you use. Nevertheless it is snapshotted and thus fault
tolerant.

Best,

Dawid

On 21/02/2019 11:50, Averell wrote:
> Hi Konstantin,
>
> The statement below is mentioned at the end of the page 
> broadcast_state.html#important-considerations
> 
>   
> /"No RocksDB state backend: Broadcast state is kept in-memory at runtime and
> memory provisioning should be done accordingly. This holds for all operator
> states."/
>
> I am using RocksDB state backend, and is confused by that statement and
> yours.
>
> Could you please help clarify?
>
> Thanks and regards,
> Averell
>  
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Broadcast state before events stream consumption

2019-02-21 Thread Averell
Hi Konstantin,

The statement below is mentioned at the end of the page 
broadcast_state.html#important-considerations

  
/"No RocksDB state backend: Broadcast state is kept in-memory at runtime and
memory provisioning should be done accordingly. This holds for all operator
states."/

I am using RocksDB state backend, and is confused by that statement and
yours.

Could you please help clarify?

Thanks and regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Stream enrichment with static data, side inputs for DataStream

2019-02-21 Thread Averell
Hi Artur,

Is that possible to make that "static" stream a keyedStream basing on that
foreign key?
If yes, then just connect the two streams, keyed on that foreign key. In the
CoProcessFunction, for every single record from the static stream, you write
its content into a ValueState; and for every record from the main stream,
you read the enrichment data from the saved ValueState to enrich that
mainstream record.

If no, then I am having the same issue :D Looking at Broadcast State, but
there is still something that doesn't look right for me.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamingFileSink causing AmazonS3Exception

2019-02-21 Thread Padarn Wilson
Thanks Kostas!

On Mon, Feb 18, 2019 at 5:10 PM Kostas Kloudas 
wrote:

> Hi Padarn,
>
> This is the jira issue:  https://issues.apache.org/jira/browse/FLINK-11187
> and the fix, as you can see, was first included in version 1.7.2.
>
> Cheers,
> Kostas
>
> On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson 
> wrote:
>
>> Hi Addison, Kostas, Steffan,
>>
>> I am also encountering this exact issue. I cannot find a JIRA ticket on
>> this, is there some planned work on implementing a fix?
>>
>> @Addison - Did you manage to find a fix that you could apply without
>> modifying the Flink codebase? If possible it would be better not patch the
>> code base and compile a custom image.
>>
>> Thanks,
>> Padarn
>>
>> On Tue, Dec 18, 2018 at 5:37 AM Addison Higham 
>> wrote:
>>
>>> Oh this is timely!
>>>
>>> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
>>> feedback there for what I believe to be a confirmed bug)
>>>
>>>
>>> I was just about to open up a flink issue for this after digging
>>> (really) deep and figuring out the issue over the weekend.
>>>
>>> The problem arises due the flink hands input streams to the
>>> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
>>> this stack trace:
>>>
>>> 2018-12-17 05:55:46,546 DEBUG
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
>>> FYI: failed to reset content inputstream before throwing up
>>> java.io.IOException: Resetting to invalid mark
>>>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>>>   at
>>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>>>   at
>>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>>>   at
>>> 

关于Flink 中 timeWindow 滚动窗口边界和数据延迟问题疑惑

2019-02-21 Thread hery168
今天看网上的一篇文章https://blog.csdn.net/xsdxs/article/details/82415450#commentsedit 

很感谢作者的整理,感觉不错,对里面的一些内容不是很理解
文章的
3.1.3 输出

1 -> 10005 -> 2001-09-09 09:47:30.000
2 -> 100054000 -> 2001-09-09 09:47:34.000
3 -> 100079900 -> 2001-09-09 09:47:59.900
(a,12,1)
4 -> 10012 -> 2001-09-09 09:48:40.000
(a,3,100079900)
5 -> 100111000 -> 2001-09-09 09:48:31.000
6 -> 100089000 -> 2001-09-09 09:48:09.000
(b,5,100111000)
(a,4,10012)

代码的resStream.print();对reduce函数里的内容进行了处理,返回为Tuple3.of(value1.f0, value1.f1 + "" 
+ value2.f1, 1L);,开始打印出的数据中,包含(a,3,100079900)
(b,5,100111000)
(a,4,10012)
这些原始数据是怎么输出的?,这些数据是否进reduce函数?
望各位大神帮忙解答一下。
最后感谢文章的作者

????????????????

2019-02-21 Thread ????





--  --
??: "";
: 2019??2??21??(??) 4:39
??: "user-zh@flink.apache.org";
: "user-zh@flink.apache.org"; 
: 




https://issues.apache.org/jira/browse/FLINK-7001
??2019??2??21?? 16:34??<1543332...@qq.com> ??
continuesTriggerAPI 
processfunction??




--  --
??: "cousin-gmail";
: 2019??2??21??(??) 4:31
??: "user-zh";

: ??



1.5
1??10??
360??

????????????????

2019-02-21 Thread ????????

https://issues.apache.org/jira/browse/FLINK-7001
??2019??2??21?? 16:34??<1543332...@qq.com> ??
continuesTriggerAPI 
processfunction??




--  --
??: "cousin-gmail";
: 2019??2??21??(??) 4:31
??: "user-zh";

: ??



1.5
1??10??
360??

????????????????

2019-02-21 Thread ????
continuesTriggerAPI 
processfunction??




--  --
??: "cousin-gmail";
: 2019??2??21??(??) 4:31
??: "user-zh";

: ??



 1.5
1??10??
360??

数据量问题

2019-02-21 Thread cousin-gmail
 嘿,各位,我想问问,如果一天1.5亿数据量的记录,要查询最近一小时内的
聚集数量,那么,有什么比较好的方案?如果是用1小时窗口,10秒滑动一次,这样子
貌似需要维持360个窗口了,这样子的话,大概需要多少内存呢?



Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Dawid Wysakowicz
Hi Stephen,

Watermark for a single operator is the minimum of Watermarks received
from all inputs, therefore if one of your shards/operators does not have
incoming data it will not produce Watermarks thus the Watermark of
WindowOperator will not progress. So this is sort of an expected behavior.

I recommend reading the docs linked by Congxian, especially this section[1].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams

On 19/02/2019 14:31, Stephen Connolly wrote:
> Hmmm my suspicions are now quite high. I created a file source that
> just replays the events straight then I get more results
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly
>  > wrote:
>
> Hmmm after expanding the dataset such that there was additional
> data that ended up on shard-0 (everything in my original dataset
> was coincidentally landing on shard-1) I am now getting output...
> should I expect this kind of behaviour if no data arrives at
> shard-0 ever?
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly
>  > wrote:
>
> Hi, I’m having a strange situation and I would like to know
> where I should start trying to debug.
>
> I have set up a configurable swap in source, with three
> implementations:
>
> 1. A mock implementation
> 2. A Kafka consumer implementation
> 3. A Kinesis consumer implementation
>
> From injecting a log and no-op map function I can see that all
> three sources pass through the events correctly.
>
> I then have a window based on event time stamps… and from
> inspecting the aggregation function I can see that the data is
> getting aggregated…, I’m using the
> `.aggregate(AggregateFunction.WindowFunction)` variant so that
> I can retrieve the key
>
> Here’s the strange thing, I only change the source (and each
> source uses the same deserialization function) but:
>
>   * When I use either Kafka or my Mock source, the
> WindowFunction gets called as events pass the end of the
> window
>   * When I use the Kinesis source, however, the window
> function never gets called. I have even tried injecting
> events into kinesis with really high timestamps to flush
> the watermarks in my
> BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a
> different behaviour:
>
>         Properties sourceProperties = new Properties();
>         ConsumerFactory sourceFactory;
>         String sourceName = configParams.getRequired("source");
>         switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>             case "kinesis":
>                 sourceFactory = FlinkKinesisConsumer::new;
>                 copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>                 copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>                 copyOptionalArg(configParams,
> "aws-access-key", sourceProperties,
> AWSConfigConstants.AWS_ACCESS_KEY_ID);
>                 copyOptionalArg(configParams,
> "aws-secret-key", sourceProperties,
> AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>                 copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>                 break;
>             case "kafka":
>                 sourceFactory = FlinkKafkaConsumer010::new;
>                 copyRequiredArg(configParams,
> "bootstrap-server", sourceProperties, "bootstrap.servers");
>                 copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id ");
>                 break;
>             case "mock":
>                 sourceFactory = MockSourceFunction::new;
>                 break;
>             default:
>                 throw new RuntimeException("Unknown source '"
> + sourceName + '\'');
>         }
>
>         // set up the streaming execution environment
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
>         env.getConfig().setAutoWatermarkInterval(1000L);
>         env.enableCheckpointing(5000);
>
>         SplitStream eventsByType =
> env.addSource(sourceFactory.create(
>         

[flink :: connected-streams :: integration-tests]

2019-02-21 Thread Rinat
Hi mates, I got some troubles with the implementation of integration tests for 
the job, based on connected streams.

It has the following logic:
I got two streams, first one is a stream of rules, and another one is a stream 
of events
to apply events on each rule, I’ve implemented a KeyedBroadcastProcessFunction 
, that broadcasts the set of rules, received from the stream
in the processBroadcastElement I'm updating the broadcast state
in the processElement method I’m evaluating all rules, from the broadcast 
state, using input event
 
I would like to implement an integration test, that will send a rule into 
pipeline and then, when it’ll be added to the broadcast state, send an event 
and check, that the output item is a result of rule evaluation.

For the test needs, I’ve replaced source functions with FromElementsFunction, 
that gives me a bounded streams with pre-defined items that will be passed over 
pipeline. 

But I couldn’t understand, how I can pass rules before sending events, maybe 
you know some practises or workarounds, how to achieve such behaviour, or maybe 
I’m doing something wrong ?

Another approach, that is also suitable for me, is to initialize a broadcast 
state manually on job startup, but I still can’t find the way hotw to do that. 

Thx for your advices.

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Jira issue Flink-11127

2019-02-21 Thread Konstantin Knauf
Hi Boris,

the exact command depends on the docker-entrypoint.sh script and the image
you are using. For the example contained in the Flink repository it is
"task-manager", I think. The important thing is to pass "taskmanager.host"
to the Taskmanager process. You can verify by checking the Taskmanager
logs. These should contain lines like below:

2019-02-21 08:03:00,004 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -  Program
Arguments:
2019-02-21 08:03:00,008 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
-Dtaskmanager.host=10.12.10.173

In the Jobmanager logs you should see that the Taskmanager is registered
under the IP above in a line similar to:

2019-02-21 08:03:26,874 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda
(akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0) at ResourceManager

A service per Taskmanager is not required. The purpose of the config
parameter is that the Jobmanager addresses the taskmanagers by IP instead
of hostname.

Hope this helps!

Cheers,

Konstantin



On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, The suggested workaround does not quite work.
> 2019-02-20 15:27:43,928 WARN akka.remote.ReliableDeliverySupervisor -
> Association with remote system [
> akka.tcp://flink-metrics@flink-taskmanager-1:6170] has failed, address is
> now gated for [50] ms. Reason: [Association failed with [
> akka.tcp://flink-metrics@flink-taskmanager-1:6170]] Caused by:
> [flink-taskmanager-1: No address associated with hostname]
> 2019-02-20 15:27:48,750 ERROR
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler
> - Caught exception
>
> I think the problem is that its trying to connect to flink-task-manager-1
>
> Using busybody to experiment with nslookup, I can see
> / # nslookup flink-taskmanager-1.flink-taskmanager
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal
>
> Name:  flink-taskmanager-1.flink-taskmanager
> Address 1: 10.131.2.136
> flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
> / # nslookup flink-taskmanager-1
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal
>
> nslookup: can't resolve 'flink-taskmanager-1'
> / # nslookup flink-taskmanager-0.flink-taskmanager
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal
>
> Name:  flink-taskmanager-0.flink-taskmanager
> Address 1: 10.131.0.111
> flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
> / # nslookup flink-taskmanager-0
> Server:10.0.11.151
> Address 1: 10.0.11.151 ip-10-0-11-151.us-west-2.compute.internal
>
> nslookup: can't resolve 'flink-taskmanager-0'
> / #
>
> So the name should be postfixed with the service name. How do I force it?
> I suspect I am missing config parameter
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf 
> wrote:
>
> Hi Boris,
>
> the solution is actually simpler than it sounds from the ticket. The only
> thing you need to do is to set the "taskmanager.host" to the Pod's IP
> address in the Flink configuration. The easiest way to do this is to pass
> this config dynamically via a command-line parameter.
>
> The Deployment spec could looks something like this:
>
> containers:
> - name: taskmanager
>   [...]
>   args:
>   - "taskmanager.sh"
>   - "start-foreground"
>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>   [...]
>
>   env:
>   - name: K8S_POD_IP
> valueFrom:
>   fieldRef:
> fieldPath: status.podIP
>
>
> Hope this helps and let me know if this works.
>
> Best,
>
> Konstantin
>
> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> I was looking at this issue
>> https://issues.apache.org/jira/browse/FLINK-11127
>> Apparently there is a workaround for it.
>> Is it possible provide the complete helm chart for it.
>> Bits and pieces are in the ticket, but it would be nice to see the full
>> chart
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>
> --
> Konstantin Knauf | Solutions Architect
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference