Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-12-02 Thread Victor Wong
Hi,

We encountered similar issues that the task manager kept being killed by
yarn.

- flink 1.9.1
- heap usage is low.

But our job is a **streaming** job, so I want to ask if this issue is only
related to **batch** job or not? Thanks!

Best,
Victor


yingjie  于2019年11月28日周四 上午11:43写道:

> Piotr is right, that depend on the data size you are reading and the memory
> pressure. Those memory occupied by mmapped region can be recycled and used
> by other processes if memory pressure is high, that is, other process or
> service on the same node won't be affected because the OS will recycle the
> mmapped pages if needed. But currently, you can't assume a bound of the
> memory can be used, because it will use more memory as long as there is
> free
> space and you have more new data to read.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Best,
Victor


Re: Limit max cpu usage per TaskManager

2019-11-06 Thread Victor Wong
Hi Lu,

You can check out which operator thread causes the high CPU usage, and set a 
unique slot sharing group name [1] to it to prevent too many operator threads 
running in the same TM.
Hope this will be helpful

[1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups

Best,
Victor

From: Vino Yang 
Date: Wednesday, 6 November 2019 at 4:26 PM
To: Lu Niu 
Cc: user 
Subject: Re: Limit max cpu usage per TaskManager

Hi Lu,

When using Flink on YARN, it will rely on YARN's resource management 
capabilities, and Flink cannot currently limit CPU usage.

Also, what version of Flink do you use? As far as I know, since Flink 1.8, the 
-yn parameter will not work.

Best,
Vino

Lu Niu mailto:qqib...@gmail.com>> 于2019年11月6日周三 下午1:29写道:
Hi,

When run flink application in yarn mode, is there a way to limit maximum cpu 
usage per TaskManager?

I tried this application with just source and sink operator. parallelism of 
source is 60 and parallelism of sink is 1. When running in default config, 
there are 60 TaskManager assigned. I notice one TaskManager process cpu usage 
could be 200% white the rest below 50%.

When I set -yn = 2 (default is 1), I notice # of TaskManger dropped down to 30. 
and one TaskManger process cpu usage could be 600% while the rest below 50%.

Tried to set yarn.containers.vcores = 2,  all tasks are in start state forever, 
application is not able to turn to running state.

Best
Lu


Re: Does operator uid() have to be unique across all jobs?

2019-10-25 Thread Victor Wong
Hi,
  “uid” is mainly useful when you upgrade your application. It’s used to match 
the operator state stored in the savepoint.
  As suggested in [1], “it is highly recommended to assign unique IDs to all 
operators of an application that might be upgraded in the future.”

  [1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state

From: "min@ubs.com" 
Date: Thursday, 24 October 2019 at 11:31 PM
To: "java.dev@gmail.com" , "dian0511...@gmail.com" 

Cc: "user@flink.apache.org" 
Subject: RE: Does operator uid() have to be unique across all jobs?

Hi,

I have some simple questions on the uid as well.


  1.  Do we add a uid for every operator e.g. print(), addSink and addSource?
  2.  For chained operators, do we need to uids for each operator? Or just the 
last operator?

e.g. .map().uid("some-id").print().uid("print-id");




Regards,


Min

From: John Smith [mailto:java.dev@gmail.com]
Sent: Donnerstag, 24. Oktober 2019 16:32
To: Dian Fu
Cc: user
Subject: [External] Re: Does operator uid() have to be unique across all jobs?

Ok cool. Thanks

BTW this seems a bit cumbersome...

.map().uid("some-id").name("some-id");

On Wed, 23 Oct 2019 at 21:13, Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Yes, you can use it in another job. The uid needs only to be unique within a 
job.

> 在 2019年10月24日,上午5:42,John Smith 
> mailto:java.dev@gmail.com>> 写道:
>
> When setting uid() of an operator does it have to be unique across all jobs 
> or just unique within a job?
>
> For example can I use env.addSource(myKafkaConsumer).uid("kafka-consumer") in 
> another job?


Re: 如何优化flink内存?

2019-09-04 Thread Victor Wong
这种情况不建议使用滑动窗口,因为会保存大量的窗口数据(24小时/1分钟);
可以自定义ProcessFunction,参照[1];

[1]. 
https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window





On 04/09/2019, 8:07 PM, "Yifei Qi"  wrote:

>大家好:
>
>
>
>不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
>我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
>具体情况是这样的:
>
>准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
>按照用户进行分组.
>
>计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
>flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
>flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
>顺祝商祺
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
>


Re: Flink Kafka Connector相关问题

2019-08-22 Thread Victor Wong
Hi 鑫铉:
  我尝试解答下;
  
  1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
  根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka 
client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
  如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
  如果Flink开启了checkpoint,那么auto commit 
offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka 0.8) 或 
kafka broker ( kafka 0.8+);
  结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 " 
auto.commit.interval.ms"大于0,就能定期提交offset到kafka;

  2. current-offsets、committed-offsets、consumer lag;
  根据官方文档 [2],
  current-offsets是当前Flink读取到的最新offset;
  committed-offsets是提交到zookeeper/kafka broker 的offset;
  consumer lag是指topic最新的offset(log end offset) 和 
committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics


On 2019/8/22, 7:21 PM, "戴鑫铉"  wrote:

您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:


1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?

2、还想问下flink kafka

connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?




Re: Understanding job flow

2019-08-16 Thread Victor Wong
Hi Vishwas,

Since `DruidStreamJob` is an “object” of scala, and the initialization of your 
sds client is not within any method, it will be called every time ` 
DruidStreamJob` is loaded (like static block in Java).
Your taskmanagers are different JVM processes, and ` DruidStreamJob` needs to 
be loaded within them, so the initialization of your sds client is called each 
time.

You can try to put the initialization within `runJob` method, and pass it down 
as a parameter.
But I wonder if a sds client can be serialized or not, since this kind of 
client usually holds a http connection, which cannot be serialized.

Best,
Victor

From: Vishwas Siravara 
Date: Friday, August 16, 2019 at 11:48 PM
To: Steven Nelson , user 
Subject: Re: Understanding job flow

I did not find this to be true. Here is my code snippet.


object DruidStreamJob extends Job with SinkFn {

  private[flink] val druidConfig = DruidConfig.current

  private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

  //  //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is 
hack.
  //  System.setProperty("java.library.path", 
"/Users/vsiravar/workspace/aipcryptoclient/lib")
  //
  //  import java.lang.reflect.Field
  //
  //  val fieldSysPath: Field = 
classOf[ClassLoader].getDeclaredField("sys_paths")
  //  fieldSysPath.setAccessible(true)
  //  fieldSysPath.set(null, null)
  //
  //  print(System.getProperty("java.library.path"))

  private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
ExecutionEnv.mockEncryption,
ExecutionEnv.enableEncryption,
ExecutionEnv
  .loadEncryptionSet)

  aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

  val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

  val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

  private[flink] val sdsClient = SDSEncryptor(decryptMap, 
ExecutionEnv.mockDecryption)

  sdsClient.init()

  /**
   * Start streaming job execution .
   *
   * @param argMap
   */
  private[flink] def runJob(argMap: Map[String, String]): Unit = {

val env = ExecutionEnv.executionEnv(argMap)
this.source = ExecutionEnv.sourceTopics

env.enableCheckpointing(1000)
env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints"))
sourceAndSinkFn(env, source)
env.execute(jobName = name)
  }

  /**
   * @inheritdoc
   * @param env
   * @param topics
   */
  override private[flink] def sourceAndSinkFn(
env: StreamExecutionEnvironment,
topics: List[String]) = {
val dataStream = addSource(env)
log.info("Subscribed to topics" + topics)

val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

  override def filter(value: GenericRecord): Boolean = {

ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) 
& ExecutionEnv
  .pcrList.contains(value.get("CMLS_DEST_PCR").toString)
  }
})

val result = filteredStream.map(record => 
encryptWithAipCryptoClient(addTimeStamp(sdsClient
  .decrypt(applyValues(record)

result.print()
KafkaSink(result).sendToKafka
  }

  private[flink] def encryptWithAipCryptoClient(maptoEncrypt: 
mutable.Map[String, Any]): mutable.Map[String, Any] = {
aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, 
AnyRef]].asJava)
maptoEncrypt
  }

  private[flink] def applyValues(
genericRecord: GenericRecord): mutable.Map[String, Any] = {

collection.mutable.Map(genericRecord.getSchema.getFields.asScala
  .map(field =>
field.schema().getType match {
  case Schema.Type.LONG =>
field.name() -> 
genericRecord.get(field.name()).asInstanceOf[Long]
  case Schema.Type.INT =>
field.name() -> 
genericRecord.get(field.name()).asInstanceOf[Int]
  case Schema.Type.DOUBLE =>
field.name() -> 
genericRecord.get(field.name()).asInstanceOf[Double]
  case Schema.Type.STRING =>
field.name() -> 
genericRecord.get(field.name()).toString
  case _ =>
field.name() -> 
genericRecord.get(field.name()).toString
}): _*)

  }

  private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): 
mutable.Map[String, Any] = {
try {
  if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
return payload + ("timestamp" -> 
TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String],
 payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
  }
  return payload + ("timestamp" -> System.currentTimeMillis())
} catch {
  case e: Exception => {
errorLogger.error("Unable to obtain epoch time, using currentSystem 
time" + e.printStackTrace())
return payload + ("timestamp" -> 

Re: **RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Victor Wong
Hi Biao,

Thanks for your reply, I will give it a try (1.8+)!

Best,
Victor

From: Biao Liu 
Date: Friday, August 9, 2019 at 5:45 PM
To: Victor Wong 
Cc: "user@flink.apache.org" 
Subject: Re: **RegistrationTimeoutException** after TaskExecutor successfully 
registered at resource manager

Hi Victor,

There used to be several relevant issues reported [1] [2] [3]. I guess you have 
encountered the same problem.
This issue has been fixed in 1.8 [4]. Could you try it on a later version 
(1.8+)?

1. https://issues.apache.org/jira/browse/FLINK-11137
2. https://issues.apache.org/jira/browse/FLINK-11215
3. https://issues.apache.org/jira/browse/FLINK-11708
4. https://issues.apache.org/jira/browse/FLINK-11718

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 9, 2019 at 4:01 PM Victor Wong 
mailto:jiasheng.w...@outlook.com>> wrote:
Hi,
I’m using Flink version 1.7.1, and I encountered this exception which was a 
little weird from my point of view;
TaskManager successfully registered at resource manager, however after 5 
minutes (which is the default value of taskmanager.registration.timeout config) 
it threw out RegistrationTimeoutException;

Here is the related logs of TM:
2019-08-09 01:30:24,061 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://flink@xxx/user/resourcemanager().
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-08-09 01:30:24,379 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager akka.tcp://flink@xxx/user/resourcemanager 
under registration id 4535dea14648f6de68f32fb1a375806e.
2019-08-09 01:30:24,404 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job 
e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id 
.
…
2019-08-09 01:30:33,590 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state FINISHED to JobManager for task Source: 
 ; // I don’t know if this is related, so I add it here in case;  This 
Flink Kafka source just finished because it consumed no Kafka partitions (Flink 
Kafka parallelism > Kafka topic partitions)
…
2019-08-09 01:35:24,753 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,
Victor


**RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Victor Wong
Hi,
I’m using Flink version 1.7.1, and I encountered this exception which was a 
little weird from my point of view;
TaskManager successfully registered at resource manager, however after 5 
minutes (which is the default value of taskmanager.registration.timeout config) 
it threw out RegistrationTimeoutException;

Here is the related logs of TM:
2019-08-09 01:30:24,061 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://flink@xxx/user/resourcemanager().
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-08-09 01:30:24,379 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager akka.tcp://flink@xxx/user/resourcemanager 
under registration id 4535dea14648f6de68f32fb1a375806e.
2019-08-09 01:30:24,404 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job 
e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id 
.
…
2019-08-09 01:30:33,590 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state FINISHED to JobManager for task Source: 
 ; // I don’t know if this is related, so I add it here in case;  This 
Flink Kafka source just finished because it consumed no Kafka partitions (Flink 
Kafka parallelism > Kafka topic partitions)
…
2019-08-09 01:35:24,753 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,
Victor


Re: Delayed processing and Rate limiting

2019-08-07 Thread Victor Wong
Hi Shakir,

> Delayed Processing
Maybe you can make use of the function 
‘org.apache.flink.streaming.api.TimerService#registerProcessingTimeTimer’, 
check this doc for more details:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example

> Rate Limit External Data Access
With AsyncFunction, you can set a ‘capacity’ which defines how many 
asynchronous requests may be in progress at the same time, I’m not sure if this 
is what you need or not.
Check this doc for more details: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api

Best,
Victor

From: "PoolakkalMukkath, Shakir" 
Date: Wednesday, August 7, 2019 at 10:06 PM
To: user 
Subject: Delayed processing and Rate limiting

Hi Flink Team,

I am looking for some direction/recommendation for below tasks


  1.  Delayed Processing:
Having a use case where we need to process events after a time-delay from event 
time. Let’s say, the event happened at time t1, and it reached the Flink 
immediately, but I have to wait t1+2min to process this.
We are sourcing the events from Kafka, we like this applied after SourceFn. May 
be we can do this by ThreadSleep() on a MapFn, but looking if there is a better 
way to achieve this.


  1.  Rate Limit External Data Access

The best practices to implement rate limiting to external service, it can be 
either on mapFn or AsynchFn. What is the recommended approach to rate limit and 
 build backpressure.

Thanks in advance

Thanks,
Shakir


Re: FlatMap returning Row<> based on ArrayList elements()

2019-08-07 Thread Victor Wong
Hi Andres,



I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you 
register the table when the number of elements/columns and data types are both 
nondeterministic.

Correct me if I misunderstood your meaning.



Best,

Victor

From: Andres Angel 
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun 
Cc: user 
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

Hello everyone, let me be more precis on what I'm looking for at the end 
because your example is right and very accurate in the way about how to turn an 
array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,1196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm 
planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to specify 
the RowTypeInfo. Here is the big deal because yes I know I would be able to use 
something like :


TypeInformation[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction, Row>() {
@Override
public void flatMap(List value, Collector out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap 
anonymous function that could return every time: 1. different number of 
elements within the Array and 2. the data type can be random likewise. I mean 
is not fixed the whole time then my TypeInformation return would fix every 
execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun 
mailto:sunhaib...@163.com>> wrote:
Hi Andres Angel,

I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction, Row>() {
@Override
public void flatMap(List value, Collector out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();

env.execute("test job");

Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" 
mailto:ingenieroandresan...@gmail.com>> wrote:

Hello everyone,

I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:

[image.png]

Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):

Tuple mytuple = Tuple.newInstance(5);
for (int i = 0; i < pelements.size(); i++) {
mytuple.setField(pelements.get(i), i);
}
out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery 
step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much


Re: How to use Lo-level Joins API

2019-08-06 Thread Victor Wong
Hi Yuta,

> I made sure the 'ValueState data' has data from stream1 with the IDE
debugger but in spite of that, processElement2 can't access it.

Since `processElement1` and `processElement2`  use the same `Context`, I think 
there is no state access issue.
Is it possible stream1 and stream2 don't have common keys?  You may verify this 
by logging out the key of current processed element.

Best,
Victor

On 2019/8/7, 10:56 AM, "Yuta Morisawa"  wrote:

Hi Yun

Thank you for replying.
 >Have you set a default value for the state ?
Actually, the constructor of the ValueStateDescriptor with default value 
is deprecated so I don't set it.

The problem occurs when the stream1 comes first.
I made sure the 'ValueState data' has data from stream1 with the IDE 
debugger but in spite of that, processElement2 can't access it.

On 2019/08/07 11:43, Yun Gao wrote:
> Hi Yuta,
>Have you set a default value for the state ? If the state did not 
> have a default value and the records from stream2 comes first for a 
> specific key, then the state would never be set with a value, thus the 
> return value will be null.
> 
> Best,
> Yun
> 
> 
> --
> From:Yuta Morisawa 
> Send Time:2019 Aug. 7 (Wed.) 08:56
> To:user 
> Subject:How to use Lo-level Joins API
> 
> Hi
> 
> I am trying to use low-level joins.
> According to the doc, the way is creating a state and access it from
> both streams, but I can't.
> 
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
> 
> This is a snippet of my code.
> It seems that the processElement1,2 have different ValueStates so that
> v1 in processElement2 is always null.
> 
> ---
> stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
> 
> public class MyCPF extends CoProcessFunction{
>ValueState data;
> 
>processElement1(v1){
>  data.update(v1);
>}
> 
>processElement2(v2){
>  v1 = data.value() // v1 is always null
>  out.collect(v1 + v2)
>}
> 
>open(){
>  data = getRuntimeContext().getState(descriptor);
>}
> 
> }
> ---
> 
> Can you tell me the collect way of the low-level joins and send me a
> sample code if you have?
> 
> --
> Thank you
> Yuta
> 
> 

-- 


Challenge for the future 豊かな未来への挑戦

Tomorrow, Together  KDDI
-
   〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail yu-moris...@kddi-research.jp
 tel  070-3871-8883

 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。