Re: MaxMetaspace default may be to low?

2020-02-24 Thread Xintong Song
In that case, I think the default metaspace size is too small for you
setup. The default configurations are not intended for such large task
managers.

In Flink 1.8 we do not set the JVM '-XX:MaxMetaspaceSize' parameter, which
means you have 'unlimited' metaspace size. We changed that in Flink 1.10 to
have stricter control on the overall memory usage of Flink processes.

Thank you~

Xintong Song



On Tue, Feb 25, 2020 at 1:24 PM John Smith  wrote:

> I would like to also add the same exact jobs on Flink 1.8 where running
> perfectly fine.
>
> On Tue, 25 Feb 2020 at 00:20, John Smith  wrote:
>
>> Right after Job execution. Basically as soon as I deployed a 5th job. So
>> at 4 jobs it was ok, at 5 jobs it would take like 1-2 minutes max and the
>> node would just shut off.
>> So far with MaxMetaSpace 256m it's been stable. My task nodes are 16GB
>> and the memory config is done as follows...
>> taskmanager.memory.flink.size: 12g
>> taskmanager.memory.jvm-metaspace.size: 256m
>>
>> 100% of the jobs right now are ETL with checkpoints, NO state,
>> Kafka -> Json Transform > DB
>> or
>> Kafka > DB lookup (to small local cache)> Json Transform
>> -> Apache Ignite
>>
>> None of the jobs are related.
>>
>> On Mon, 24 Feb 2020 at 20:59, Xintong Song  wrote:
>>
>>> Hi John,
>>>
>>> The default metaspace size is intend for working with a major proportion
>>> of jobs. We are aware that for some jobs that need to load lots of classes,
>>> the default value might not be large enough. However, having a larger
>>> default value means for other jobs that do not load many classes, the
>>> overall memory requirements might be unnecessarily high. (Imagine you have
>>> a task manager with the default total memory 1.5GB, but 512m of it is
>>> reserved for metaspace.)
>>>
>>> Another possible problem is metaspace leak. When you say "eventually
>>> task nodes started shutting down with OutOfMemory Metaspace", does this
>>> problem happen shortly after the job execution starts, or does it happen
>>> after job running for a while? Does the metaspace footprint keep growing or
>>> become stable after the initial growth? If the metaspace keeps growing
>>> along with time, it's usually an indicator of metaspace memory leak.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Feb 25, 2020 at 7:50 AM John Smith 
>>> wrote:
>>>
 Hi, I just upgraded to 1.10 and I started deploying my jobs. Eventually
 task nodes started shutting down with OutOfMemory Metaspace.

 I look at the logs and the task managers are started with:
 -XX:MaxMetaspaceSize=100663296

 So I configed: taskmanager.memory.jvm-metaspace.size: 256m

 It seems to be ok for now. What are your thoughts? And should I try
 512m or is that too much?

>>>


state schema evolution for case classes

2020-02-24 Thread ApoorvK
Hi Team,

Earlier we have developed on flink 1.6.2 , So there are lots of case classes
which have Map,Nested case class within them for example below :

case class MyCaseClass(var a: Boolean,
 var b: Boolean,
 var c: Boolean,
 var d: NestedCaseClass,
 var e:Int){
def this(){this(false,false,new NestedCaseClass,0)}
}


Now we have migrated to flink 1.8.2 , I need help to figure out how can I
achieve state schema evolution for such classes.

1. Is creating avro for these classes now, and implement avro serialisation
will that work ? 
2. Or if I register kyroserialiser with protobuf serialiser at env?

Please suggest what can be done here, or redirect for the avros
serialisation example.

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


AW: Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-24 Thread theo.diefent...@scoop-software.de
Hi, At last flink forward in Berlin I spoke with some persons about the same problem, where they had construction devices as IoT sensors which could even be offline for multiple days. They told me that the major problem for them was that the watermark in Flink is maintained per operator /subtask, even if you group by key. They solved their problem via a Flink process function where they have full control over state and timers, so you can deal with each device as you like and can e. g. maintain something similar to a per device watermark. I also think that it is the best way to go for this usecase. Best regardsTheo Ursprüngliche Nachricht Von: hemant singh Datum: Di., 25. Feb. 2020, 06:19An: Marco Villalobos Cc: user@flink.apache.orgBetreff: Re: Timeseries aggregation with many IoT devices off of one Kafka topic.Hello,I am also working on something similar. Below is the pipeline design I have, sharing may be it can be helpful.topic -> keyed stream on device-id -> window operation -> sink.You can PM me on further details.Thanks,HemantOn Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos  wrote:







I need to collect timeseries data from thousands of IoT devices. Each device has name, value, and timestamp published to one Kafka topic.  The event time timestamps are in order only relation with the individual
 device, but out of order with respect to other devices.
 
Is there a way to aggregate a 15 minute window of this in which each IoT devices gets aggregated with its own event time?
 
I would deeply appreciate if somebody could guide me to an approach for solving this in Flink.
 
I wish there was a group chat for these type of problems. 

 






Re:Flink 1.10连接hive时kerberos认证异常问题

2020-02-24 Thread sunfulin


Hi,
我在配置flink连接hive时,由于集群开启了Kerberos认证,经过一番探索,异常没有了。但是现在连接的时候需要我输入Kerberos用户名和密码。我理解指定了keytab文件路径后,应该不需要用户名和密码了吧?请教各位大神可能的配置问题。


security.kerberos.login.use-ticker-cache: false
security.kerberos.login.keytab: 
/app/flink/flink-1.10.10/kerberos/flink_test.keytab
security.kerberos.login.principal: flink_t...@hadoop.htsc.com









At 2020-02-21 18:18:57, "sunfulin"  wrote:

Hi,
我使用Flink 
1.10集成hive,在连接metastore的时候由于hive对应CDH集群开启了kerberos认证,抛出了如下异常:请问大家这个该怎么配置或者解决哈?


999  [main] INFO  hive.metastore  - Trying to connect to metastore with URI 
thrift://namenode01.htsc.com:9083
1175 [main] ERROR org.apache.thrift.transport.TSaslTransport  - SASL 
negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
  at 
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
  at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
  at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:420)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:181)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:118)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:43)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
  at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.doJob(HiveMetaJob.java:44)
  at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.main(HiveMetaJob.java:23)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed 
to find any Kerberos tgt)
  at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
  at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
  ... 34 more




 

Re: MaxMetaspace default may be to low?

2020-02-24 Thread John Smith
I would like to also add the same exact jobs on Flink 1.8 where running
perfectly fine.

On Tue, 25 Feb 2020 at 00:20, John Smith  wrote:

> Right after Job execution. Basically as soon as I deployed a 5th job. So
> at 4 jobs it was ok, at 5 jobs it would take like 1-2 minutes max and the
> node would just shut off.
> So far with MaxMetaSpace 256m it's been stable. My task nodes are 16GB and
> the memory config is done as follows...
> taskmanager.memory.flink.size: 12g
> taskmanager.memory.jvm-metaspace.size: 256m
>
> 100% of the jobs right now are ETL with checkpoints, NO state,
> Kafka -> Json Transform > DB
> or
> Kafka > DB lookup (to small local cache)> Json Transform
> -> Apache Ignite
>
> None of the jobs are related.
>
> On Mon, 24 Feb 2020 at 20:59, Xintong Song  wrote:
>
>> Hi John,
>>
>> The default metaspace size is intend for working with a major proportion
>> of jobs. We are aware that for some jobs that need to load lots of classes,
>> the default value might not be large enough. However, having a larger
>> default value means for other jobs that do not load many classes, the
>> overall memory requirements might be unnecessarily high. (Imagine you have
>> a task manager with the default total memory 1.5GB, but 512m of it is
>> reserved for metaspace.)
>>
>> Another possible problem is metaspace leak. When you say "eventually task
>> nodes started shutting down with OutOfMemory Metaspace", does this problem
>> happen shortly after the job execution starts, or does it happen after job
>> running for a while? Does the metaspace footprint keep growing or become
>> stable after the initial growth? If the metaspace keeps growing along with
>> time, it's usually an indicator of metaspace memory leak.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Feb 25, 2020 at 7:50 AM John Smith 
>> wrote:
>>
>>> Hi, I just upgraded to 1.10 and I started deploying my jobs. Eventually
>>> task nodes started shutting down with OutOfMemory Metaspace.
>>>
>>> I look at the logs and the task managers are started with:
>>> -XX:MaxMetaspaceSize=100663296
>>>
>>> So I configed: taskmanager.memory.jvm-metaspace.size: 256m
>>>
>>> It seems to be ok for now. What are your thoughts? And should I try 512m
>>> or is that too much?
>>>
>>


Re: MaxMetaspace default may be to low?

2020-02-24 Thread John Smith
Right after Job execution. Basically as soon as I deployed a 5th job. So at
4 jobs it was ok, at 5 jobs it would take like 1-2 minutes max and the node
would just shut off.
So far with MaxMetaSpace 256m it's been stable. My task nodes are 16GB and
the memory config is done as follows...
taskmanager.memory.flink.size: 12g
taskmanager.memory.jvm-metaspace.size: 256m

100% of the jobs right now are ETL with checkpoints, NO state,
Kafka -> Json Transform > DB
or
Kafka > DB lookup (to small local cache)> Json Transform ->
Apache Ignite

None of the jobs are related.

On Mon, 24 Feb 2020 at 20:59, Xintong Song  wrote:

> Hi John,
>
> The default metaspace size is intend for working with a major proportion
> of jobs. We are aware that for some jobs that need to load lots of classes,
> the default value might not be large enough. However, having a larger
> default value means for other jobs that do not load many classes, the
> overall memory requirements might be unnecessarily high. (Imagine you have
> a task manager with the default total memory 1.5GB, but 512m of it is
> reserved for metaspace.)
>
> Another possible problem is metaspace leak. When you say "eventually task
> nodes started shutting down with OutOfMemory Metaspace", does this problem
> happen shortly after the job execution starts, or does it happen after job
> running for a while? Does the metaspace footprint keep growing or become
> stable after the initial growth? If the metaspace keeps growing along with
> time, it's usually an indicator of metaspace memory leak.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Feb 25, 2020 at 7:50 AM John Smith  wrote:
>
>> Hi, I just upgraded to 1.10 and I started deploying my jobs. Eventually
>> task nodes started shutting down with OutOfMemory Metaspace.
>>
>> I look at the logs and the task managers are started with:
>> -XX:MaxMetaspaceSize=100663296
>>
>> So I configed: taskmanager.memory.jvm-metaspace.size: 256m
>>
>> It seems to be ok for now. What are your thoughts? And should I try 512m
>> or is that too much?
>>
>


Re: Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-24 Thread hemant singh
Hello,

I am also working on something similar. Below is the pipeline design I
have, sharing may be it can be helpful.

topic -> keyed stream on device-id -> window operation -> sink.

You can PM me on further details.

Thanks,
Hemant

On Tue, Feb 25, 2020 at 1:54 AM Marco Villalobos 
wrote:

> I need to collect timeseries data from thousands of IoT devices. Each
> device has name, value, and timestamp published to one Kafka topic.  The
> event time timestamps are in order only relation with the individual
> device, but out of order with respect to other devices.
>
>
>
> Is there a way to aggregate a 15 minute window of this in which each IoT
> devices gets aggregated with its own event time?
>
>
>
> I would deeply appreciate if somebody could guide me to an approach for
> solving this in Flink.
>
>
>
> I wish there was a group chat for these type of problems.
>
>
>


????flink-jdbc-driver????mysql??????(flink1.10.0)

2020-02-24 Thread ????
??jdbcgatewaymysql??jdbc-drivermysql??SQL-gateway??sqlclient??tablesink??Caused
 by: org.apache.flink.table.api.TableException: RetractStreamTableSink and 
UpsertStreamTableSink is not supported in Batch environment.  at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:85)
   at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:48)
   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:48)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:69)
   at 
org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:68)
Connection connection = 
DriverManager.getConnection("jdbc:flink://dataflow1:8083?planner=blink");
Statement statement = connection.createStatement();
sql = "CREATE TABLE t_user_target (\n" +
"  id BIGINT,\n" +
"  username VARCHAR\n" +
") WITH (\n" +
"  'connector.type' = 'jdbc',\n" +
"  'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
"  'connector.url' = 'jdbc:mysql://172.18.100.85:3306/targetdb',\n" +
"  'connector.table' = 't1target',\n" +
"  'connector.username' = 'root',\n" +
"  'connector.password' = 'root',\n" +
"  'connector.write.flush.max-rows' = '5000'\n" +
");";
statement.executeUpdate(sql);
statement.execute("insert into t_user_target values(1,'fan'),(2,'ss')");

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-24 Thread Yang Wang
Hi M Singh,

> Mans - If we use the session based deployment option for K8 - I thought
> K8 will automatically restarts any failed TM or JM.
> In the case of failed TM - the job will probably recover, but in the case
> of failed JM - perhaps we need to resubmit all jobs.
> Let me know if I have misunderstood anything.


Since you are starting JM/TM with K8s deployment, when they failed new
JM/TM will be created. If you do not set the high
availability configuration, your jobs could recover when TM failed.
However, they could not recover when JM failed. With HA
configured, the jobs could always be recovered and you do not need to
re-submit again.

> Mans - Is there any safe way of a passing creds ?


Yes, you are right, Using configmap to pass the credentials is not safe. On
K8s, i think you could use secrets instead[1].

> Mans - Does a task manager failure cause the job to fail ?  My
> understanding is the JM failure are catastrophic while TM failures are
> recoverable.


What i mean is the job failed, and it could be restarted by your configured
restart strategy[2].

> Mans - So if we are saving checkpoint in S3 then there is no need for
> disks - should we use emptyDir ?


Yes, if you are saving the checkpoint in S3 and also set the
`high-availability.storageDir` to S3. Then you do not need persistent
volume. Since
the local directory is only used for local cache, so you could directly use
the overlay filesystem or empryDir(better io performance).


[1].
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance

M Singh  于2020年2月25日周二 上午5:53写道:

> Thanks Wang for your detailed answers.
>
> From what I understand the native_kubernetes also leans towards creating a
> session and submitting a job to it.
>
> Regarding other recommendations, please my inline comments and advice.
>
> On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang <
> danrtsey...@gmail.com> wrote:
>
>
> Hi Singh,
>
> Glad to hear that you are looking to run Flink on the Kubernetes. I am
> trying to answer your question based on my limited knowledge and
> others could correct me and add some more supplements.
>
> I think the biggest difference between session cluster and per-job cluster
> on Kubernetesis the isolation. Since for per-job, a dedicated Flink cluster
> will be started for the only one job and no any other jobs could be
> submitted.
> Once the job is finished, then the Flink cluster will be
> destroyed immediately.
> The second point is one-step submission. You do not need to start a Flink
> cluster first and then submit a job to the existing session.
>
> > Are there any benefits with regards to
> 1. Configuring the jobs
> No matter you are using the per-job cluster or submitting to the existing
> session cluster, they share the configuration mechanism. You do not have
> to change any codes and configurations.
>
> 2. Scaling the taskmanager
> Since you are using the Standalone cluster on Kubernetes, it do not provide
> an active resourcemanager. You need to use external tools to monitor and
> scale up the taskmanagers. The active integration is still evolving and you
> could have a taste[1].
>
> Mans - If we use the session based deployment option for K8 - I thought K8
> will automatically restarts any failed TM or JM.
> In the case of failed TM - the job will probably recover, but in the case
> of failed JM - perhaps we need to resubmit all jobs.
> Let me know if I have misunderstood anything.
>
> 3. Restarting jobs
> For the session cluster, you could directly cancel the job and re-submit.
> And
> for per-job cluster, when the job is canceled, you need to start a new
> per-job
> cluster from the latest savepoint.
>
> 4. Managing the flink jobs
> The rest api and flink command line could be used to managing the jobs(e.g.
> flink cancel, etc.). I think there is no difference for session and
> per-job here.
>
> 5. Passing credentials (in case of AWS, etc)
> I am not sure how do you provide your credentials. If you put them in the
> config map and then mount into the jobmanager/taskmanager pod, then both
> session and per-job could support this way.
>
> Mans - Is there any safe way of a passing creds ?
>
> 6. Fault tolerence and recovery of jobs from failure
> For session cluster, if one taskmanager crashed, then all the jobs which
> have tasks
> on this taskmanager will failed.
> Both session and per-job could be configured with high availability and
> recover
> from the latest checkpoint.
>
> Mans - Does a task manager failure cause the job to fail ?  My
> understanding is the JM failure are catastrophic while TM failures are
> recoverable.
>
> > Is there any need for specifying volume for the pods?
> No, you do not need to specify a volume for pod. All the data in the pod
> local directory is temporary. When a pod crashed and relaunched, the
> taskmanager will retrieve the checkpoint from zookeeper + 

Re: Flink读写kafka数据聚集任务失败问题

2020-02-24 Thread zhisheng
看到异常信息  - Closing TaskExecutor connection
container_1578492316659_0830_01_06 because: Container
[pid=30031,containerID=container_1578492316659_0830_01_06] is running
beyond physical memory limits. Current usage: 10.0 GB of 10 GB physical
memory used; 11.8 GB of 21 GB virtual memory used. Killing container.

应该是超内存了,容器被 kill 了

chanamper  于 2020年2月24日周一 上午10:33写道:

>
> 大家好,请教一下,flink任务读取kafka数据进行聚集操作后将结果写回kafka,flink版本为1.8.0。任务运行一段时间后出现如下异常,之后flink任务异常挂掉,请问一下这个问题该如何解决呢?多谢
>
> 2020-02-19 10:45:45,314 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> - Encountered error while consuming partitions
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
> 2020-02-19 10:45:45,317 INFO
> org.apache.kafka.clients.producer.KafkaProducer   - [Producer
> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2020-02-19 10:45:45,412 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Memory
> usage stats: [HEAP: 98/6912/6912 MB, NON HEAP: 81/83/-1 MB
> (used/committed/max)]
>
> 2020-02-19 10:45:45,413 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Direct
> memory stats: Count: 24596, Total Capacity: 806956211, Used Memory:
> 806956212
>
>
>
>
>
>
> 2020-02-19 10:50:31,351 WARN  akka.remote.transport.netty.NettyTransport
>   - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: aj-flinknode01/
> 9.186.36.80:56983
> 2020-02-19 10:50:31,351 WARN  akka.remote.ReliableDeliverySupervisor
>   - Association with remote system
> [akka.tcp://flink@aj-flinknode01:56983] has failed, address is now gated
> for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@aj-flinknode01:56983]] Caused by: [Connection refused:
> aj-flinknode01/9.186.36.80:56983]
> 2020-02-19 10:50:55,419 WARN  akka.remote.ReliableDeliverySupervisor
>   - Association with remote system
> [akka.tcp://flink@aj-flinknode01:45703] has failed, address is now gated
> for [50] ms. Reason: [Disassociated]
> 2020-02-19 10:50:56,370 INFO  org.apache.flink.yarn.YarnResourceManager
>  - Closing TaskExecutor connection
> container_1578492316659_0830_01_06 because: Container
> [pid=30031,containerID=container_1578492316659_0830_01_06] is running
> beyond physical memory limits. Current usage: 10.0 GB of 10 GB physical
> memory used; 11.8 GB of 21 GB virtual memory used. Killing container.
> Dump of the process-tree for container_1578492316659_0830_01_06 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 30068 30031 30031 30031 (java) 277668 18972 12626370560 2630988
> /data/jdk1.8.0_211/bin/java -Xms6912m -Xmx6912m
> -XX:MaxDirectMemorySize=3328m -XX:+UseG1GC
> -Dlog.file=/data/hadoop-2.6.0-cdh5.16.1/logs/userlogs/application_1578492316659_0830/container_1578492316659_0830_01_06/taskmanager.log
> -Dlogback.configurationFile=file:./logback.xml
> -Dlog4j.configuration=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
> |- 30031 30027 30031 30031 (bash) 0 0 11001856 329 /bin/bash -c
> /data/jdk1.8.0_211/bin/java -Xms6912m -Xmx6912m
> -XX:MaxDirectMemorySize=3328m -XX:+UseG1GC
> 

Re: MaxMetaspace default may be to low?

2020-02-24 Thread Xintong Song
Hi John,

The default metaspace size is intend for working with a major proportion of
jobs. We are aware that for some jobs that need to load lots of classes,
the default value might not be large enough. However, having a larger
default value means for other jobs that do not load many classes, the
overall memory requirements might be unnecessarily high. (Imagine you have
a task manager with the default total memory 1.5GB, but 512m of it is
reserved for metaspace.)

Another possible problem is metaspace leak. When you say "eventually task
nodes started shutting down with OutOfMemory Metaspace", does this problem
happen shortly after the job execution starts, or does it happen after job
running for a while? Does the metaspace footprint keep growing or become
stable after the initial growth? If the metaspace keeps growing along with
time, it's usually an indicator of metaspace memory leak.

Thank you~

Xintong Song



On Tue, Feb 25, 2020 at 7:50 AM John Smith  wrote:

> Hi, I just upgraded to 1.10 and I started deploying my jobs. Eventually
> task nodes started shutting down with OutOfMemory Metaspace.
>
> I look at the logs and the task managers are started with:
> -XX:MaxMetaspaceSize=100663296
>
> So I configed: taskmanager.memory.jvm-metaspace.size: 256m
>
> It seems to be ok for now. What are your thoughts? And should I try 512m
> or is that too much?
>


Re: Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-24 Thread Xintong Song
Hi Ben,

You can not share slots across jobs. Flink adopts a two-level slot
scheduling mechanism. Slots are firstly allocated to each job, then the
JobMaster decides which tasks should be executed in which slots, i.e. slot
sharing.

I think what you are looking for is Pipelined Region Restart Strategy [1],
which restarts only the tasks connected by pipelined edges instead of the
whole job graph.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy



On Mon, Feb 24, 2020 at 11:28 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all!
>
> I have a setup composed of several streaming pipelines. These have
> different deployment lifecycles: I want to be able to modify and redeploy
> the topology of one while the other is still up. I am thus putting them in
> different jobs.
>
> The problem is I have a Co-Location constraint between one subtask of each
> pipeline; I'd like them to run on the same TaskSlots, much like if they
> were sharing a TaskSlot; or at least have them on the same JVM.
>
> A semi-official feature
> "DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
> exists for this, but seem to be tied to the Sub-Tasks actually being able
> to be co-located on the same Task Slot.
>
> The documentation mentions [2] that it might be impossible to do ("Flink
> allows subtasks to share slots even if they are subtasks of different
> tasks, so long as they are *from the same job*").
>
> The streaming pipelines are numerous (about 10), and I can't afford to
> increase the number of TaskSlots per TaskManager. I also would like to
> avoid putting all the pipelines in the same job, restarting it every time a
> single one changes.
>
> I'd like to have mailing list's informed opinion about this, if there are
> workarounds, or if I could reconsider my problem under another angle.
>
> Cheers
> Ben
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources
>


Re: yarn session: one JVM per task

2020-02-24 Thread Xintong Song
Depending on your Flink version, the '-n' option might not take effect. It
is removed in the latest release, but before that there were a few versions
where this option is neither removed nor taking effect.

Anyway, as long as you have multiple containers, I don't think there's a
way to make some of the tasks scheduled to the same JVM. Not that I'm aware
of.


Thank you~

Xintong Song



On Mon, Feb 24, 2020 at 8:43 PM David Morin 
wrote:

> Hi,
>
> Thanks Xintong.
> I've noticed than when I use yarn-session.sh with --slots (-s) parameter
> but without --container (-n) it creates one task/slot per taskmanager.
> Before with the both n and -s it was not the case.
> I prefer to use only small container with only one task to scale my
> pipeline and of course to prevent from thread-safe issue
> Do you think I cannot be confident on that behaviour ?
>
> Regards,
> David
>
> On 2020/02/22 17:11:25, David Morin  wrote:
> > Hi,
> > My app is based on a lib that is not thread safe (yet...).
> > In waiting of the patch has been pushed, how can I be sure that my Sink
> that uses this lib is in one JVM ?
> > Context: I use one Yarn session and send my Flink jobs to this session
> >
> > Regards,
> > David
> >
>


MaxMetaspace default may be to low?

2020-02-24 Thread John Smith
Hi, I just upgraded to 1.10 and I started deploying my jobs. Eventually
task nodes started shutting down with OutOfMemory Metaspace.

I look at the logs and the task managers are started with:
-XX:MaxMetaspaceSize=100663296

So I configed: taskmanager.memory.jvm-metaspace.size: 256m

It seems to be ok for now. What are your thoughts? And should I try 512m or
is that too much?


Re: MODERATE for d...@flink.apache.org

2020-02-24 Thread Henry Saputra
Hi Sri,

Thank you for your interest with Apache Flink. To continue to interact with
people in the mailing list, please subscribe to the list [1] to make sure
your posts are delivered to the right list.

Thanks,

Henry


[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list


>
> -- Forwarded message --
> From: sri hari kali charan Tummala 
> To: user , d...@flink.apache.org
> Cc:
> Bcc:
> Date: Mon, 24 Feb 2020 16:25:59 -0600
> Subject: Batch Flink Job S3 write performance vs Spark
> Hi All,
>
> have a question did anyone compared the performance of Flink batch job
> writing to s3 vs spark writing to s3?
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Batch Flink Job S3 write performance vs Spark

2020-02-24 Thread sri hari kali charan Tummala
Hi All,

have a question did anyone compared the performance of Flink batch job
writing to s3 vs spark writing to s3?

-- 
Thanks & Regards
Sri Tummala


Re: AWS Client Builder with default credentials

2020-02-24 Thread Suneel Marthi
Not sure if this helps - this is how I invoke a Sagemaker endpoint model
from a flink pipeline.

See
https://github.com/smarthi/NMT-Sagemaker-Inference/blob/master/src/main/java/de/dws/berlin/util/AwsUtil.java



On Mon, Feb 24, 2020 at 10:08 AM David Magalhães 
wrote:

> Hi Robert, thanks for your reply.
>
> GlobalConfiguration.loadConfiguration was useful to check if a
> flink-conf.yml file was on resources, for the integration tests that I'm
> doing. On the cluster I will use the default configurations.
>
> On Fri, Feb 21, 2020 at 10:58 AM Robert Metzger 
> wrote:
>
>> There are multiple ways of passing configuration parameters to your user
>> defined code in Flink
>>
>> a)  use getRuntimeContext().getUserCodeClassLoader().getResource() to
>> load a config file from your user code jar or the classpath.
>> b)  use
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to
>> access a configuration object serialized from the main method.
>> you can pass a custom object to the job parameters, or use Flink's
>> "Configuration" object in your main method:
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> Configuration config = new Configuration();
>> config.setString("foo", "bar");
>> env.getConfig().setGlobalJobParameters(config);
>>
>> c) Load the flink-conf.yaml:
>>
>> Configuration conf = GlobalConfiguration.loadConfiguration();
>>
>> I'm not 100% sure if this approach works, as it is not intended to be
>> used in user code (I believe).
>>
>>
>> Let me know if this helps!
>>
>> Best,
>> Robert
>>
>> On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler 
>> wrote:
>>
>>> First things first, we do not intend for users to use anything in the S3
>>> filesystem modules except the filesystems itself,
>>> meaning that you're somewhat treading on unsupported ground here.
>>>
>>> Nevertheless, the S3 modules contain a large variety of AWS-provided
>>> CerentialsProvider implementations,
>>> that can derive credentials from environment variables, system
>>> properties, files on the classpath and many more.
>>>
>>> Ultimately though, you're kind of asking us how to use AWS APIs, for
>>> which I would direct you to the AWS documentation.
>>>
>>> On 20/02/2020 13:16, David Magalhães wrote:
>>>
>>> I'm using
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder
>>> to create a S3 client to copy objects and delete object inside
>>> a TwoPhaseCommitSinkFunction.
>>>
>>> Shouldn't be another way to set up configurations without put them
>>> hardcoded ? Something like core-site.xml or flink-conf.yaml ?
>>>
>>> Right now I need to have them hardcoded like this.
>>>
>>> AmazonS3ClientBuilder.standard
>>>   .withPathStyleAccessEnabled(true)
>>>   .withEndpointConfiguration(
>>> new EndpointConfiguration("http://minio:9000;, "us-east-1")
>>>   )
>>>   .withCredentials(
>>> new AWSStaticCredentialsProvider(new
>>> BasicAWSCredentials("minio", "minio123"))
>>>   )
>>>   .build
>>>
>>> Thanks
>>>
>>>
>>>


Re: AWS Client Builder with default credentials

2020-02-24 Thread sri hari kali charan Tummala
check this.

https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/dynamodb/dynStreams/FlinkDynamoDBStreams.scala#L42

https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/kinesis/consumer/transactionScalaWorkingExample/TransactionScalaConsumer.scala#L41



On Mon, Feb 24, 2020 at 9:08 AM David Magalhães 
wrote:

> Hi Robert, thanks for your reply.
>
> GlobalConfiguration.loadConfiguration was useful to check if a
> flink-conf.yml file was on resources, for the integration tests that I'm
> doing. On the cluster I will use the default configurations.
>
> On Fri, Feb 21, 2020 at 10:58 AM Robert Metzger 
> wrote:
>
>> There are multiple ways of passing configuration parameters to your user
>> defined code in Flink
>>
>> a)  use getRuntimeContext().getUserCodeClassLoader().getResource() to
>> load a config file from your user code jar or the classpath.
>> b)  use
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to
>> access a configuration object serialized from the main method.
>> you can pass a custom object to the job parameters, or use Flink's
>> "Configuration" object in your main method:
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> Configuration config = new Configuration();
>> config.setString("foo", "bar");
>> env.getConfig().setGlobalJobParameters(config);
>>
>> c) Load the flink-conf.yaml:
>>
>> Configuration conf = GlobalConfiguration.loadConfiguration();
>>
>> I'm not 100% sure if this approach works, as it is not intended to be
>> used in user code (I believe).
>>
>>
>> Let me know if this helps!
>>
>> Best,
>> Robert
>>
>> On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler 
>> wrote:
>>
>>> First things first, we do not intend for users to use anything in the S3
>>> filesystem modules except the filesystems itself,
>>> meaning that you're somewhat treading on unsupported ground here.
>>>
>>> Nevertheless, the S3 modules contain a large variety of AWS-provided
>>> CerentialsProvider implementations,
>>> that can derive credentials from environment variables, system
>>> properties, files on the classpath and many more.
>>>
>>> Ultimately though, you're kind of asking us how to use AWS APIs, for
>>> which I would direct you to the AWS documentation.
>>>
>>> On 20/02/2020 13:16, David Magalhães wrote:
>>>
>>> I'm using
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder
>>> to create a S3 client to copy objects and delete object inside
>>> a TwoPhaseCommitSinkFunction.
>>>
>>> Shouldn't be another way to set up configurations without put them
>>> hardcoded ? Something like core-site.xml or flink-conf.yaml ?
>>>
>>> Right now I need to have them hardcoded like this.
>>>
>>> AmazonS3ClientBuilder.standard
>>>   .withPathStyleAccessEnabled(true)
>>>   .withEndpointConfiguration(
>>> new EndpointConfiguration("http://minio:9000;, "us-east-1")
>>>   )
>>>   .withCredentials(
>>> new AWSStaticCredentialsProvider(new
>>> BasicAWSCredentials("minio", "minio123"))
>>>   )
>>>   .build
>>>
>>> Thanks
>>>
>>>
>>>

-- 
Thanks & Regards
Sri Tummala


Re: Apache Flink Job fails repeatedly due to RemoteTransportException

2020-02-24 Thread M Singh
 Thanks will try your recommendations and apologize for the delayed response.
On Wednesday, January 29, 2020, 09:58:26 AM EST, Till Rohrmann 
 wrote:  
 
 Hi M Singh,
have you checked the TaskManager logs of 
ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623 for any suspicious logging 
statements? This might help to uncover why another node thinks that this 
TaskManager is no longer reachable.
You could also try whether the same problem remains if you upgrade to one of 
Flink latest versions (1.9.1 for example).
Cheers,Till
On Wed, Jan 29, 2020 at 8:37 AM M Singh  wrote:

Hi Folks:
We have streaming Flink application (using v 1.6.2) and it dies within 12 
hours.  We have configured number of restarts which is 10 at the moment.
Sometimes the job runs for some time and then within a very short time has a 
number of restarts and finally fails.  In other instances, the restarts happen 
randomly. So there is no pattern that I could discern for the restarts.
I can increase the restart count but would like to see if there is any advice 
on the root cause of this issue.  I've seen a some emails in the user groups 
but could not find any definitive solution or investigation steps.

Is there any any on how to investigate it further or resolve it ?
The exception we see in the job manager is:
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job testJob 
(d65a52389f9ea30def1fe522bf3956c6) switched from state FAILING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'ip-xx-xxx-xxx-xxx.ec2.internal/xx.xxx.xxx.xxx:39623'. This might indicate that 
the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at java.lang.Thread.run(Thread.java:748)
2020-01-29 06:15:42,371 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not 
restart the job testJob (d65a52389f9ea30def1fe522bf3956c6) because the restart 
strategy prevented it.

  

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-24 Thread M Singh
 Thanks Wang for your detailed answers.
>From what I understand the native_kubernetes also leans towards creating a 
>session and submitting a job to it.  
Regarding other recommendations, please my inline comments and advice.
On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang 
 wrote:  
 
 Hi Singh,
Glad to hear that you are looking to run Flink on the Kubernetes. I amtrying to 
answer your question based on my limited knowledge andothers could correct me 
and add some more supplements.
I think the biggest difference between session cluster and per-job clusteron 
Kubernetesis the isolation. Since for per-job, a dedicated Flink clusterwill be 
started for the only one job and no any other jobs could be submitted.Once the 
job is finished, then the Flink cluster will be destroyed immediately.The 
second point is one-step submission. You do not need to start a Flinkcluster 
first and then submit a job to the existing session.
> Are there any benefits with regards to1. Configuring the jobsNo matter you 
>are using the per-job cluster or submitting to the existingsession cluster, 
>they share the configuration mechanism. You do not haveto change any codes and 
>configurations.
2. Scaling the taskmanagerSince you are using the Standalone cluster on 
Kubernetes, it do not providean active resourcemanager. You need to use 
external tools to monitor andscale up the taskmanagers. The active integration 
is still evolving and youcould have a taste[1].
Mans - If we use the session based deployment option for K8 - I thought K8 will 
automatically restarts any failed TM or JM. In the case of failed TM - the job 
will probably recover, but in the case of failed JM - perhaps we need to 
resubmit all jobs.Let me know if I have misunderstood anything.
3. Restarting jobsFor the session cluster, you could directly cancel the job 
and re-submit. Andfor per-job cluster, when the job is canceled, you need to 
start a new per-jobcluster from the latest savepoint.
4. Managing the flink jobsThe rest api and flink command line could be used to 
managing the jobs(e.g.flink cancel, etc.). I think there is no difference for 
session and per-job here.
5. Passing credentials (in case of AWS, etc)
I am not sure how do you provide your credentials. If you put them in the 
config map and then mount into the jobmanager/taskmanager pod, then bothsession 
and per-job could support this way.
Mans - Is there any safe way of a passing creds ?
6. Fault tolerence and recovery of jobs from failure
For session cluster, if one taskmanager crashed, then all the jobs which have 
taskson this taskmanager will failed. Both session and per-job could be 
configured with high availability and recoverfrom the latest checkpoint. 
Mans - Does a task manager failure cause the job to fail ?  My understanding is 
the JM failure are catastrophic while TM failures are recoverable.
> Is there any need for specifying volume for the pods?No, you do not need to 
>specify a volume for pod. All the data in the pod local directory is 
>temporary. When a pod crashed and relaunched, thetaskmanager will retrieve the 
>checkpoint from zookeeper + S3 and resumefrom the latest checkpoint.
Mans - So if we are saving checkpoint in S3 then there is no need for disks - 
should we use emptyDir ?

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
M Singh  于2020年2月23日周日 上午2:28写道:

Hey Folks:
I am trying to figure out the options for running Flink on Kubernetes and am 
trying to find out the pros and cons of running in Flink Session vs Flink 
Cluster mode 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes).
I understand that in job mode there is no need to submit the job since it is 
part of the job image.  But what are other the pros and cons of this approach 
vs session mode where a job manager is deployed and flink jobs can be submitted 
it ?  Are there any benefits with regards to:
1. Configuring the jobs 2. Scaling the taskmanager3. Restarting jobs4. Managing 
the flink jobs5. Passing credentials (in case of AWS, etc)6. Fault tolerence 
and recovery of jobs from failure
Also, we will be keeping the checkpoints for the jobs on S3.  Is there any need 
for specifying volume for the pods ?  If volume is required do we need 
provisioned volume and what are the recommended alternatives/considerations 
especially with AWS.
If there are any other considerations, please let me know.
Thanks for your advice.




  

Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-24 Thread John Smith
Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy my
tasks.

The first 1 seems to deploy ok, but subsequent ones seem to this throw this
error. But The seem to work still.

javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-2
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:477)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:167)


Timeseries aggregation with many IoT devices off of one Kafka topic.

2020-02-24 Thread Marco Villalobos
I need to collect timeseries data from thousands of IoT devices. Each device 
has name, value, and timestamp published to one Kafka topic.  The event time 
timestamps are in order only relation with the individual device, but out of 
order with respect to other devices.

Is there a way to aggregate a 15 minute window of this in which each IoT 
devices gets aggregated with its own event time?

I would deeply appreciate if somebody could guide me to an approach for solving 
this in Flink.

I wish there was a group chat for these type of problems.



Map Of DataStream getting NullPointer Exception

2020-02-24 Thread aj
I am trying below piece of code to create multiple datastreams object and
store in map.

for (EventConfig eventConfig : eventTypesList) {
LOGGER.info("creating a stream for ",
eventConfig.getEvent_name());
String key = eventConfig.getEvent_name();
final StreamingFileSink sink =
StreamingFileSink.forBulkFormat
(path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream stream =
dataStream.filter((FilterFunction) genericRecord -> {
if
(genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
{
return true;
}
return false;
});

Tuple2,
StreamingFileSink> tuple2 = new Tuple2<>(stream, sink);
streamMap.put(key, tuple2);
}

DataStream searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
searchStream.map(new
Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map
value at :


*DataStream searchStream =
streamMap.get(SEARCH_LIST_KEYLESS).getField(0);*

As per my understanding, this is due to the map is local to main and not
broadcast to tasks.
If I want to do this how should I do, please help me to resolve this?



-- 
Thanks & Regards,
Anuj Jain






Re: async io parallelism

2020-02-24 Thread Alexey Trenikhun
Arvid, thank you.
So there is single instance of FIFO per async IO operator regardless of 
parallelism of the async IO operator?
Thanks,
Alexey


From: Arvid Heise 
Sent: Saturday, February 22, 2020 1:23:01 PM
To: Alexey Trenikhun 
Cc: user@flink.apache.org 
Subject: Re: async io parallelism

Hi Alexey,

the short answer is: order is preserved in all cases.

Basically, ordered asyncIO maintains an internal FIFO queue where all pending 
elements reside. All async results are saved into this queue, but elements will 
only be outputted when the head element has a result.

So assume you have three input records i1, i2, i3 and get the outputs 
asynchronously in the order o2, o1, o3 after 100 ms each, then there is no 
output after receiving o2, then o1 and o2 are outputted after 200 ms, and then 
o3 after 300 ms.

Best,

Arvid

On Sat, Feb 22, 2020 at 2:22 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from 
Kafka, where they were partitioned by "key", then I do processing using 
KeyedProcessFunction (keyed by same "key"), then I enrich elements using 
ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed 
by same "key") and then write to Kafka topic, again partitioned by same "key", 
something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> 
AsyncDataStream.orderedWait() -> 
keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"?

  *   Will Output function receive elements with same "key" in same order as 
they were originally in Kafka?
  *   Will FlinkKafkaProducer writes elements with same "key" in same order as 
they were originally in Kafka?
  *   Does it depend on parallelism of async IO? Documentation says "the stream 
order is preserved", but if there are multiple parallel instances of async 
function, does it mean order relative to each single instance? Or total stream 
order?

Thanks,
Alexey


Colocating Sub-Tasks across jobs / Sharing Task Slots across jobs

2020-02-24 Thread Benoît Paris
Hello all!

I have a setup composed of several streaming pipelines. These have
different deployment lifecycles: I want to be able to modify and redeploy
the topology of one while the other is still up. I am thus putting them in
different jobs.

The problem is I have a Co-Location constraint between one subtask of each
pipeline; I'd like them to run on the same TaskSlots, much like if they
were sharing a TaskSlot; or at least have them on the same JVM.

A semi-official feature
"DataStream.getTransformation().setCoLocationGroupKey(stringKey)" [1]
exists for this, but seem to be tied to the Sub-Tasks actually being able
to be co-located on the same Task Slot.

The documentation mentions [2] that it might be impossible to do ("Flink
allows subtasks to share slots even if they are subtasks of different
tasks, so long as they are *from the same job*").

The streaming pipelines are numerous (about 10), and I can't afford to
increase the number of TaskSlots per TaskManager. I also would like to
avoid putting all the pipelines in the same job, restarting it every time a
single one changes.

I'd like to have mailing list's informed opinion about this, if there are
workarounds, or if I could reconsider my problem under another angle.

Cheers
Ben

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/translation/FlinkUniverse.java#L116

[2]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#task-slots-and-resources


Re: AWS Client Builder with default credentials

2020-02-24 Thread David Magalhães
Hi Robert, thanks for your reply.

GlobalConfiguration.loadConfiguration was useful to check if a
flink-conf.yml file was on resources, for the integration tests that I'm
doing. On the cluster I will use the default configurations.

On Fri, Feb 21, 2020 at 10:58 AM Robert Metzger  wrote:

> There are multiple ways of passing configuration parameters to your user
> defined code in Flink
>
> a)  use getRuntimeContext().getUserCodeClassLoader().getResource() to
> load a config file from your user code jar or the classpath.
> b)  use getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to
> access a configuration object serialized from the main method.
> you can pass a custom object to the job parameters, or use Flink's
> "Configuration" object in your main method:
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> Configuration config = new Configuration();
> config.setString("foo", "bar");
> env.getConfig().setGlobalJobParameters(config);
>
> c) Load the flink-conf.yaml:
>
> Configuration conf = GlobalConfiguration.loadConfiguration();
>
> I'm not 100% sure if this approach works, as it is not intended to be used
> in user code (I believe).
>
>
> Let me know if this helps!
>
> Best,
> Robert
>
> On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler 
> wrote:
>
>> First things first, we do not intend for users to use anything in the S3
>> filesystem modules except the filesystems itself,
>> meaning that you're somewhat treading on unsupported ground here.
>>
>> Nevertheless, the S3 modules contain a large variety of AWS-provided
>> CerentialsProvider implementations,
>> that can derive credentials from environment variables, system
>> properties, files on the classpath and many more.
>>
>> Ultimately though, you're kind of asking us how to use AWS APIs, for
>> which I would direct you to the AWS documentation.
>>
>> On 20/02/2020 13:16, David Magalhães wrote:
>>
>> I'm using
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder
>> to create a S3 client to copy objects and delete object inside
>> a TwoPhaseCommitSinkFunction.
>>
>> Shouldn't be another way to set up configurations without put them
>> hardcoded ? Something like core-site.xml or flink-conf.yaml ?
>>
>> Right now I need to have them hardcoded like this.
>>
>> AmazonS3ClientBuilder.standard
>>   .withPathStyleAccessEnabled(true)
>>   .withEndpointConfiguration(
>> new EndpointConfiguration("http://minio:9000;, "us-east-1")
>>   )
>>   .withCredentials(
>> new AWSStaticCredentialsProvider(new BasicAWSCredentials("minio",
>> "minio123"))
>>   )
>>   .build
>>
>> Thanks
>>
>>
>>


Re: 1.10 SqlClient启动报错

2020-02-24 Thread godfrey he
hi 邮件里的图片看不到,麻烦重新提供一下图片。或者直接把错误消息贴出来

1  于2020年2月24日周一 下午9:39写道:

> Hi,all
>
> 我在linux上启动1.10的sql-client,却遇到了如下错误信息:
>
>
>
> 看了一下sql-client.sh脚本需要在FLINK_OPT_DIR路径中找到Sql-client.jar,我就手动export了一下FLINK_OPT_DIR,但还是报上面的错,与此同时,我启动1.9版本的sql-client则可以正常启动,感觉这个问题很诡异,麻烦请大家帮忙看看
>
>
>
>


1.10 SqlClient启动报错

2020-02-24 Thread 1
Hi,all

我在linux上启动1.10的sql-client,却遇到了如下错误信息:




看了一下sql-client.sh脚本需要在FLINK_OPT_DIR路径中找到Sql-client.jar,我就手动export了一下FLINK_OPT_DIR,但还是报上面的错,与此同时,我启动1.9版本的sql-client则可以正常启动,感觉这个问题很诡异,麻烦请大家帮忙看看

Re: yarn session: one JVM per task

2020-02-24 Thread David Morin
Hi,

Thanks Xintong.
I've noticed than when I use yarn-session.sh with --slots (-s) parameter but 
without --container (-n) it creates one task/slot per taskmanager. Before with 
the both n and -s it was not the case.
I prefer to use only small container with only one task to scale my pipeline 
and of course to prevent from thread-safe issue
Do you think I cannot be confident on that behaviour ?

Regards,
David

On 2020/02/22 17:11:25, David Morin  wrote: 
> Hi,
> My app is based on a lib that is not thread safe (yet...).
> In waiting of the patch has been pushed, how can I be sure that my Sink that 
> uses this lib is in one JVM ?
> Context: I use one Yarn session and send my Flink jobs to this session
> 
> Regards,
> David
> 


Re: StreamingFileSink Not Flushing All Data

2020-02-24 Thread Kostas Kloudas
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
 wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. 
> If you don't have checkpoints enabled it might happen that not all data is 
> flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the 
> StreamingFileSink#BulkWriter. It seems like flushing the output stream 
> doesn't flush all the data written. I've verified I can create valid files 
> using the same APIs and data on there own, so thinking it must be something 
> I'm doing wrong with the bulk format. I'm writing to the local filesystem, 
> with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 
> 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import 
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
> private static final long serialVersionUID = 1L;
>
> String id;
>
> public Record() {}
>
> public Record(String id) {
>   this.id = id;
> }
>
> public String getId() {
>   return id;
> }
>
> public void setId(String id) {
>   this.id = id;
> }
>   }
>
>   public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> TarArchiveOutputStream taos = new TarArchiveOutputStream(new 
> GzipCompressorOutputStream(new 
> FileOutputStream("/home/austin/Downloads/test.tgz")));
> TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", 
> "test"));
> String fullText = "hey\nyou\nwork";
> byte[] fullTextData = fullText.getBytes();
> fileEntry.setSize(fullTextData.length);
> taos.putArchiveEntry(fileEntry);
> taos.write(fullTextData, 0, fullTextData.length);
> taos.closeArchiveEntry();
> taos.flush();
> taos.close();
>
> StreamingFileSink textSink = StreamingFileSink
> .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
> new BulkWriter.Factory() {
>   @Override
>   public BulkWriter create(FSDataOutputStream out) throws 
> IOException {
> final TarArchiveOutputStream compressedOutputStream = new 
> TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
> return new BulkWriter() {
>   @Override
>   public void addElement(Record record) throws IOException {
> TarArchiveEntry fileEntry = new 
> TarArchiveEntry(String.format("%s.txt", record.id));
> byte[] fullTextData = 
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
> fileEntry.setSize(fullTextData.length);
> compressedOutputStream.putArchiveEntry(fileEntry);
> compressedOutputStream.write(fullTextData, 0, 
> fullTextData.length);
> compressedOutputStream.closeArchiveEntry();
>   }
>
>   @Override
>   public void flush() throws IOException {
> compressedOutputStream.flush();
>   }
>
>   @Override
>   public void finish() throws IOException {
> this.flush();
>   }
> };
>   }
> })
> .withBucketCheckInterval(1000)
> .build();
>
> env
> .fromElements(new Record("1"), new Record("2"))
> .addSink(textSink)
> .name("Streaming File Sink")
> .uid("streaming-file-sink");
> env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are 
> then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114   Blocks: 8  IO Block: 4096   

Re: StreamingFileSink Not Flushing All Data

2020-02-24 Thread Dawid Wysakowicz
Hi Austing,

If I am not mistaken the StreamingFileSink by default flushes on
checkpoints. If you don't have checkpoints enabled it might happen that
not all data is flushed.

I think you can also adjust that behavior with:

forBulkFormat(...)

.withRollingPolicy(/* your custom logic */)

I also cc Kostas who should be able to correct me if I am wrong.

Best,

Dawid

On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the
> StreamingFileSink#BulkWriter. It seems like flushing the output stream
> doesn't flush all the data written. I've verified I can create valid
> files using the same APIs and data on there own, so thinking it must
> be something I'm doing wrong with the bulk format. I'm writing to the
> local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library,
> version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import 
> org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import 
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; 
> import org.apache.flink.api.common.serialization.BulkWriter; import 
> org.apache.flink.core.fs.FSDataOutputStream; import 
> org.apache.flink.core.fs.Path; import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; 
> import java.io.FileOutputStream; import java.io.IOException; import 
> java.io.Serializable; import java.nio.charset.StandardCharsets; class Scratch 
> {
>   public static class Record implements Serializable {
> private static final long serialVersionUID = 1L; String id; public 
> Record() {}
>
> public Record(String id) {
>   this.id = id; }
>
> public String getId() {
>   return id; }
>
> public void setId(String id) {
>   this.id = id; }
>   }
>
>   public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); TarArchiveOutputStream 
> taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new 
> FileOutputStream("/home/austin/Downloads/test.tgz"))); TarArchiveEntry 
> fileEntry = new TarArchiveEntry(String.format("%s.txt", "test")); String 
> fullText = "hey\nyou\nwork"; byte[] fullTextData = fullText.getBytes(); 
> fileEntry.setSize(fullTextData.length); taos.putArchiveEntry(fileEntry); 
> taos.write(fullTextData, 0, fullTextData.length); taos.closeArchiveEntry(); 
> taos.flush(); taos.close(); StreamingFileSink textSink = 
> StreamingFileSink
> .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"), 
> new BulkWriter.Factory() {
>   @Override public BulkWriter create(FSDataOutputStream 
> out) throws IOException {
> final TarArchiveOutputStream compressedOutputStream = new 
> TarArchiveOutputStream(new GzipCompressorOutputStream(out)); return new 
> BulkWriter() {
>   @Override public void addElement(Record record) throws 
> IOException {
> TarArchiveEntry fileEntry = new 
> TarArchiveEntry(String.format("%s.txt", record.id)); byte[] fullTextData = 
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); 
> fileEntry.setSize(fullTextData.length); 
> compressedOutputStream.putArchiveEntry(fileEntry); 
> compressedOutputStream.write(fullTextData, 0, fullTextData.length); 
> compressedOutputStream.closeArchiveEntry(); }
>
>   @Override public void flush() throws IOException {
> compressedOutputStream.flush(); }
>
>   @Override public void finish() throws IOException {
> this.flush(); }
> }; }
> })
> .withBucketCheckInterval(1000)
> .build(); env
> .fromElements(new Record("1"), new Record("2"))
> .addSink(textSink)
> .name("Streaming File Sink")
> .uid("streaming-file-sink"); env.execute("streaming file sink test"); 
> }
> }
>
> From the stat/ hex dumps, you can see that the first bits are there,
> but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
>   1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
>  |1.. |
> 0010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
>  |..afO...+.<.|
> 0020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 

Java implementations of Streaming applications for Flink

2020-02-24 Thread Piper Piper
Hi all,

The examples in the Flink github repo do not seem to include many standard
streaming applications compared to the batch examples.

Where can I get standard (recommended) Java implementations of “Streaming”
applications for Flink, that are clearly: (1) CPU-intensive, like streaming
PageRank, streaming K-Means, etc., and (2) I/O-intensive, and if possible
also,  (3) Network intensive?

By "standard", I mean Java implementations that are accepted by the
community in terms of efficiency.

Thank you,

Piper


Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-24 Thread Congxian Qiu
+1 for dropping savepoint compatibility with Flink 1.2

Best,
Congxian


Dawid Wysakowicz  于2020年2月24日周一 下午4:00写道:

> +1 for dropping
>
> Best,
>
> Dawid
> On 24/02/2020 08:22, Yu Li wrote:
>
> +1 for dropping savepoint compatibility with Flink 1.2.
>
> Best Regards,
> Yu
>
>
> On Sat, 22 Feb 2020 at 22:05, Ufuk Celebi  wrote:
>
>> Hey Stephan,
>>
>> +1.
>>
>> Reading over the linked ticket and your description here, I think it
>> makes a lot of sense to go ahead with this. Since it's possible to upgrade
>> via intermediate Flink releases as a fail-safe I don't have any concerns.
>>
>> – Ufuk
>>
>>
>> On Fri, Feb 21, 2020 at 4:34 PM Till Rohrmann 
>> wrote:
>> >
>> > +1 for dropping savepoint compatibility with Flink 1.2.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen  wrote:
>> >>
>> >> Thank you for the feedback.
>> >>
>> >> Here is the JIRA issue with some more explanation also about the
>> background and implications:
>> >> https://jira.apache.org/jira/browse/FLINK-16192
>> >>
>> >> Best,
>> >> Stephan
>> >>
>> >>
>> >> On Thu, Feb 20, 2020 at 2:26 PM vino yang 
>> wrote:
>> >>>
>> >>> +1 for dropping Savepoint compatibility with Flink 1.2
>> >>>
>> >>> Flink 1.2 is quite far away from the latest 1.10. Especially after
>> the release of Flink 1.9 and 1.10, the code and architecture have undergone
>> major changes.
>> >>>
>> >>> Currently, I am updating state migration tests for Flink 1.10. I can
>> still see some binary snapshot files of version 1.2. If we agree on this
>> topic, we may be able to alleviate some of the burdens(remove those binary
>> files) when the migration tests would be updated later.
>> >>>
>> >>> Best,
>> >>> Vino
>> >>>
>> >>> Theo Diefenthal  于2020年2月20日周四
>> 下午9:04写道:
>> 
>>  +1 for dropping compatibility.
>> 
>>  I personally think that it is very important for a project to keep a
>> good pace in developing that old legacy stuff must be dropped from time to
>> time. As long as there is an upgrade routine (via going to another flink
>> release) that's fine.
>> 
>>  
>>  Von: "Stephan Ewen" 
>>  An: "dev" , "user" 
>>  Gesendet: Donnerstag, 20. Februar 2020 11:11:43
>>  Betreff: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2
>> 
>>  Hi all!
>>  For some cleanup and simplifications, it would be helpful to drop
>> Savepoint compatibility with Flink version 1.2. That version was released
>> almost three years ago.
>> 
>>  I would expect that no one uses that old version any more in a way
>> that they actively want to upgrade directly to 1.11.
>> 
>>  Even if, there is still the way to first upgrade to another version
>> (like 1.9) and then upgrade to 1.11 from there.
>> 
>>  Any concerns to drop that support?
>> 
>>  Best,
>>  Stephan
>> 
>> 
>>  --
>>  SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107
>> Köln
>>  Theo Diefenthal
>> 
>>  T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
>>  theo.diefent...@scoop-software.de - www.scoop-software.de
>>  Sitz der Gesellschaft: Köln, Handelsregister: Köln,
>>  Handelsregisternummer: HRB 36625
>>  Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
>>  Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>>
>>>


Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-24 Thread Dawid Wysakowicz
+1 for dropping

Best,

Dawid

On 24/02/2020 08:22, Yu Li wrote:
> +1 for dropping savepoint compatibility with Flink 1.2.
>
> Best Regards,
> Yu
>
>
> On Sat, 22 Feb 2020 at 22:05, Ufuk Celebi  > wrote:
>
> Hey Stephan,
>
> +1.
>
> Reading over the linked ticket and your description here, I think
> it makes a lot of sense to go ahead with this. Since it's possible
> to upgrade via intermediate Flink releases as a fail-safe I don't
> have any concerns.
>
> – Ufuk
>
>
> On Fri, Feb 21, 2020 at 4:34 PM Till Rohrmann
> mailto:trohrm...@apache.org>> wrote:
> >
> > +1 for dropping savepoint compatibility with Flink 1.2.
> >
> > Cheers,
> > Till
> >
> > On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen  > wrote:
> >>
> >> Thank you for the feedback.
> >>
> >> Here is the JIRA issue with some more explanation also about
> the background and implications:
> >> https://jira.apache.org/jira/browse/FLINK-16192
> >>
> >> Best,
> >> Stephan
> >>  
> >>
> >> On Thu, Feb 20, 2020 at 2:26 PM vino yang
> mailto:yanghua1...@gmail.com>> wrote:
> >>>
> >>> +1 for dropping Savepoint compatibility with Flink 1.2
> >>>
> >>> Flink 1.2 is quite far away from the latest 1.10. Especially
> after the release of Flink 1.9 and 1.10, the code and architecture
> have undergone major changes.
> >>>
> >>> Currently, I am updating state migration tests for Flink 1.10.
> I can still see some binary snapshot files of version 1.2. If we
> agree on this topic, we may be able to alleviate some of the
> burdens(remove those binary files) when the migration tests would
> be updated later.
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Theo Diefenthal  > 于2020年2月20日周四
> 下午9:04写道:
> 
>  +1 for dropping compatibility.
> 
>  I personally think that it is very important for a project to
> keep a good pace in developing that old legacy stuff must be
> dropped from time to time. As long as there is an upgrade routine
> (via going to another flink release) that's fine.
> 
>  
>  Von: "Stephan Ewen" mailto:se...@apache.org>>
>  An: "dev"  >, "user"  >
>  Gesendet: Donnerstag, 20. Februar 2020 11:11:43
>  Betreff: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2
> 
>  Hi all!
>  For some cleanup and simplifications, it would be helpful to
> drop Savepoint compatibility with Flink version 1.2. That version
> was released almost three years ago.
> 
>  I would expect that no one uses that old version any more in
> a way that they actively want to upgrade directly to 1.11.
> 
>  Even if, there is still the way to first upgrade to another
> version (like 1.9) and then upgrade to 1.11 from there.
> 
>  Any concerns to drop that support?
> 
>  Best,
>  Stephan
> 
> 
>  --
>  SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P -
> D-51107 Köln
>  Theo Diefenthal
> 
>  T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
>  theo.diefent...@scoop-software.de
>  - www.scoop-software.de
> 
>  Sitz der Gesellschaft: Köln, Handelsregister: Köln,
>  Handelsregisternummer: HRB 36625
>  Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
>  Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>


signature.asc
Description: OpenPGP digital signature