Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Yuval Itzchakov
Debshish, could you share an example of before and after of your classes for future reference? On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, wrote: > We solved the problem of serialization by making some things transient > which were being captured as part of the closure. So we no longer have > ser

Re: Client for Monitoring API!

2019-09-19 Thread Anis Nasir
Thanks Biao for your response. We would like to fetch metrics at subtask level for each checkpoint. This information is not exposed via default metrics, but are available in rest end point! Also, would like to persist the history of checkpoints. This information is lost whenever we restart the jo

Re: Flink 1.9, MapR secure cluster, high availability

2019-09-19 Thread Stephan Ewen
Hi! Not sure what is happening here. - I cannot understand why MapR FS should use Flink's relocated ZK dependency - It might be that it doesn't and that all the logging we see probably comes from Flink's HA services. Maybe the MapR stuff uses a different logging framework and the logs do not

Re: Error "Failed to load native Mesos library from" when I run Flink on a compiled version of Apache Mesos

2019-09-19 Thread Felipe Gutierrez
thanks Rui, the problem still persists after I try to start Flink on Mesos with your command. This is what I have on conf/flink-conf.yaml mesos.master: 127.0.0.1:5050 mesos.initial-tasks: 1 mesos.resourcemanager.tasks.container.type: mesos jobmanager.heap.mb: 1024 jobmanager.web.address: 127.0.0.

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
Hi Yuval - Here's a brief summary f what we are trying to do .. At the library level we have this .. def buildExecutionGraph(): Unit def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = { buildExecutionGraph() env.execute(s"Executing $streamletRef") } and we d

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-19 Thread Tony Wei
Hi Becket, I found that those transactions were tend to be failed with InvalidTxnStateException if they never sent any records but committed after some brokers being restarted. Because the error state transition always failed from EMPTY to COMMIT, I run a job with only one parallelism with or wit

MapState not support transfer value

2019-09-19 Thread shengjk1
Hi,all This is my code .process(new KeyedProcessFunction, String>() { ... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .cleanupInBackground() .setUpdateType(Stat

Re: Batch mode with Flink 1.8 unstable?

2019-09-19 Thread Fabian Hueske
Hi Ken, Changing the parallelism can affect the generation of input splits. I had a look at BinaryInputFormat, and it adds a bunch of empty input splits if the number of generated splits is less than the minimum number of splits (which is equal to the parallelism). See --> https://github.com/apac

In yarn-cluster model, mapState not support transfer value

2019-09-19 Thread shengjk1
Hi,all As we know java map support transfer value,such as : HashMap> stringListHashMap = new HashMap<>(); for (int i = 0; i < 10; i++) { List a = stringListHashMap.get("a"+i%2); if (a==null){ a=new ArrayList<>(); stringListHashMap.put("a"+i%2,a); } a.add("a"+i); } stringListHash

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Zhu Zhu
Thanks everyone for the input. The RestartStrategy customization is not recognized as a public interface as it is not explicitly documented. As it is not used from the feedbacks of this survey, I'll conclude that we do not need to support customized RestartStrategy for the new scheduler in Flink 1

Re: MapState not support transfer value

2019-09-19 Thread Dian Fu
Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated. Regards

Re: In yarn-cluster model, mapState not support transfer value

2019-09-19 Thread Dian Fu
Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated. I gues

Re: In yarn-cluster model, mapState not support transfer value

2019-09-19 Thread shengjk1
Hi, Dian Thanks for your reminder. I saw HeapMaoState.java and TtlMapState.java, You are right. But with objectList get large, every time call "mapState.put(key,objectList);” is very influencing performance, even lead to checkpoint timeout. Now I am not have a better method to improve perf

Re: [External] Re: From Kafka Stream to Flink

2019-09-19 Thread Casado Tejedor , Rubén
Thanks Fabian. @Hequn Cheng Could you share the status? Thanks for your amazing work! De: Fabian Hueske Fecha: viernes, 16 de agosto de 2019, 9:30 Para: "Casado Tejedor, Rubén" CC: Maatary Okouya , miki haiat , user , Hequn Cheng Asunto: Re: [External] Re: From Ka

Set flink-conf parameters dynamicly

2019-09-19 Thread Vasily Melnik
Hi, all I wonder is it possible to pass custom flink-conf file as run util parameter or change config parameters in Java code manually? I'd like to change metrics.scope parameter for each job independently. С уважением, Василий Мельник

Re: Set flink-conf parameters dynamicly

2019-09-19 Thread Dian Fu
Hi Vasily Melnik, If you use yarn mode, you could try to use the option "-yD " during submitting the job [1] to change the configuration dynamically. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html Re

Re: Set flink-conf parameters dynamicly

2019-09-19 Thread Vasily Melnik
Thanks, Dian! It works for me) С уважением, Василий Мельник On Thu, 19 Sep 2019 at 18:10, Dian Fu wrote: > Hi Vasily Melnik, > > If you use yarn mode, you could try to use the option "-yD value>" during submitting the job [1] to change the configuration > dynamically. > > [1] https://ci.apache

flink: keyed process function, why are timestamp of register event timer different as "on timer" function timestamp

2019-09-19 Thread Xiangyu Su
Hi User, We are using keyed process function with Event time for flink streaming application. We register event time on "processElement" function, and mentioned that "onTimer" function had different "timestamp" as registered on "processElement" function. If we understand correctly, "onTimer" shou

Re: [External] Re: From Kafka Stream to Flink

2019-09-19 Thread Hequn Cheng
Hi, Fabian is totally right. Big thanks to the detailed answers and nice examples above. As for the PR, very sorry about the delay. It is mainly because of the merge of blink and my work switching to Flink Python recently. However, I think the later version of blink would cover this feature nativ

count distinct not supported in batch?

2019-09-19 Thread Fanbin Bu
Hi, Just found that count distinct is supported in streaming but not in batch (version 1.8), is there any plan to add this to batch? SELECT user_id , hop_end(created_at, interval '30' second, interval '30' second) as bucket_ts , count(distinct name) FROM $table GROUP BY user_id , hop(cr

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Steven Wu
We do use config like "restart-strategy: org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional metrics than the Flink provided ones. On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu wrote: > Thanks everyone for the input. > > The RestartStrategy customization is not recognized as a public

Re: flink: keyed process function, why are timestamp of register event timer different as "on timer" function timestamp

2019-09-19 Thread Dian Fu
Hi Xiangyu, What do you mean by "should be triggered by using same time"? The timestamp of the onTimer callback is not the watermark time. It's the timestamp which is registered in the call of registerEventTimeTimer. Regards, Dian > 在 2019年9月19日,下午11:29,Xiangyu Su 写道: > > Hi User, > > We ar

Re: How to implement grouping set in stream

2019-09-19 Thread Dian Fu
AFAIK, grouping sets has already been supported for streaming in blink planner. You could check FLINK-12192 for details. Regards, Dian > 在 2019年9月10日,下午6:51,刘建刚 写道: > > I want to implement grouping set in stream. I am new to flink sql. I > want to find a example to teach me how to self

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Zhu Zhu
Thanks Steven for the feedback! Could you share more information about the metrics you add in you customized restart strategy? Thanks, Zhu Zhu Steven Wu 于2019年9月20日周五 上午7:11写道: > We do use config like "restart-strategy: > org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional > m

Re: count distinct not supported in batch?

2019-09-19 Thread JingsongLee
Hi fanbin: It is "distinct aggregates for group window" in batch sql mode. Now, legacy planner: not support. blink planner: not support. There is no clear plan yet. But if the demand is strong, we can consider supporting it. Best, Jingsong Lee

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-19 Thread Tony Wei
Hi, Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened flink's log level to DEBUG for producer. And I found some logs from flink side regarding this error. Below is some log snippet. It seems that producer client didn't catch this error and retry to find new coordinator. T

Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-19 Thread Tony Wei
Hi, I found that the source code [1] in kafka showed that it always check if `newPartitionsInTransaction` is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`, that is not applied to flink kafka producer code [2]. I wrote a simple producer with the `flushNewPartitions` co

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following .. def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with S

Re: Batch mode with Flink 1.8 unstable?

2019-09-19 Thread Till Rohrmann
Good to hear that some of your problems have been solved Ken. For the UTFDataFormatException it is hard to tell. Usually it says that the input has been produced using `writeUTF`. Cloud you maybe provide an example program which reproduces the problem? Moreover, it would be helpful to see how the i

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Biao Liu
Hi Debasish, I think there is something critical of your usage hided. It might help if you could provide more details. It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future? WRT this ProgramAbortException issue, do you sub

Re: Running flink examples

2019-09-19 Thread Biao Liu
Hi, I guess the specific input (--input /path/to/input) didn't work. I just checked the PageRank example program, it accepts "--pages" and "--links" as input parameters. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Sep 2019 at 14:56, Vijay Bhaskar wrote: > Can you check whether its able to read the supp

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future. I realize that because of statics concurrent job submission will be an issue. But we are