Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Rafi Aroch
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I
think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

/** The database specific client that can issue concurrent
requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post,
credentials)

/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())


override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the
resultval resultFutureRequested: Future[String] =
client.query(str)

// set the callback to be executed once the request by the
client is complete// the callback simply forwards the result
to the result futureresultFutureRequested.onSuccess {
case result: String =>
resultFuture.complete(Iterable((str, result)))
}
}}

// create the original streamval stream: DataStream[String] = ...
// apply the async I/O transformationval resultStream:
DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(),
1000, TimeUnit.MILLISECONDS, 100)


Thanks,
Rafi

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh 
wrote:

> ok, the above problem was due to some serialization issues which we fixed
> by marking some of the things transient. This fixes the serialization
> issues .. But now when I try to execute in a Future I hit upon this ..
>
>
> *java.util.concurrent.ExecutionException: Boxed Error* at
> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
> at
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at scala.concurrent.Promise.complete(Promise.scala:53)
> at scala.concurrent.Promise.complete$(Promise.scala:52)
> at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>
> *Caused by:
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
> at
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> ... 7 more
>
> I found this issue in JIRA
> https://issues.apache.org/jira/browse/FLINK-10381 which is still open and
> talks about a related issue. But we are not submitting multiple jobs - we
> are just submitting 1 job but async in a Future. I am not clear why this
> should create the problem that I see.
>
> Can anyone please help with an explanation ?
>
> regards.
>
> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh 
> wrote:
>
>> I think the issue may not be linked with Future. What happens is when
>> this piece of code is executed ..
>>
>> val rides: DataStream[TaxiRide] =
>>   readStream(inTaxiRide)
>> .filter { ride ⇒ ride.getIsStart().booleanValue }
>> .keyBy("rideId")
>>
>> val fares: DataStream[TaxiFare] =
>>   readStream(inTaxiFare)
>> .keyBy("rideId")
>>
>> val processed: DataStream[TaxiRideFare] =
>>   rides
>> .connect(fares)
>> .flatMap(new EnrichmentFunction)
>>
>> somehow the ClosureCleaner gets executed as evident from the following
>> which tries to serialize Avro data. Is there any way to pass the custom
>> avro serializer that I am using ?
>>
>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>> 

Re: Property based testing

2019-09-18 Thread Indraneel R
Oh great! Thanks, Aaron that was quite clear.
I will give it a try!

On Wed, Sep 18, 2019 at 8:29 PM Aaron Levin  wrote:

> Hey,
>
> I've used ScalaCheck to test flink applications. Basic idea is:
>
> * use ScalaCheck to generate some kind of collection
> * use `fromCollection` in `StreamExecutionEnvironment` to create a
> `DataStream`
> * use `DataStreamUtils.collect` as a sink
> * plug my flink logic between the collection source and the collection sink
> * make a ScalaCheck property assertion based on the input collection and
> output collection.
>
> Possible to wrap all that in a single method in Scala. LMK if you have any
> more questions or any of this was not clear!
>
> (note: not sure how to do this in Java).
>
> Best,
>
> Aaron Levin
>
> On Wed, Sep 18, 2019 at 8:36 AM Indraneel R 
> wrote:
>
>> Hi All,
>>
>> Is there any property based testing framework for flink like
>> 'SparkTestingBase'  for spark?
>>
>> Would also be useful to know what are some of the standard testing
>> practices for data testing for flink pipelines.
>>
>> regards
>> -Indraneel
>>
>


Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-18 Thread Tony Wei
Hi Becket,

One more thing, I have tried to restart other brokers without active
controller, but
this exception might happen as well. So it should be independent  of the
active
controller like you said.

Best,
Tony Wei

Tony Wei  於 2019年9月18日 週三 下午6:14寫道:

> Hi Becket,
>
> I have reproduced this problem in our development environment. Below is
> the log message with debug level.
> Seems that the exception was from broker-3, and I also found other error
> code in broker-2 during the time.
>
> There are others INVALID_TXN_STATE error for other transaction id. I just
> list one of them. Above log messages only
> shows message with `kafka-sink--eba862242e60de7e4744f3307058f865-7's`
> substring before `2019-09-18 07:14`.
>
> I didn't see other information to find out why producer tried to make
> transaction state from EMPTY to COMMIT, and what
> made NOT_COORDINATOR happened. Do you have any thought about what's
> happening? Thanks.
>
> *Number of Kafka brokers: 3*
> *logging config for kafka:*
>
>> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>>
>> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
>> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
>> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
>> (%c)%n
>> log4j.appender.transactionAppender.MaxFileSize=10MB
>> log4j.appender.transactionAppender.MaxBackupIndex=10
>> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
>> log4j.additivity.kafka.coordinator.transaction=true
>>
>
>
> *flink-ui*
>>
>> Timestamp: 2019-09-18, 07:13:43
>>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
>> transactions failed, logging first encountered failure
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
>> ... 5 more
>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
>> producer attempted a transactional operation in an invalid state
>>
>
>
> *broker-3*
>>
>> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
>> TransactionalId: blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
>> received transaction marker result to send: COMMIT
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> INVALID_TXN_STATE error to client for blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating
>> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4,
>> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
>> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with
>> coordinator epoch 4 for blacklist -> Sink:
>> kafka-sink--eba862242e60de7e4744f3307058f865-7 succeeded
>> (kafka.coordinator.transaction.TransactionStateManager)
>>
>
> *broker-2*
>
>> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating
>> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
>> transaction state to TxnTransitMetadata(producerId=7019, produc
>> erEpoch=0, 

Re: Client for Monitoring API!

2019-09-18 Thread Biao Liu
Hi Anis,

Have you tried Flink metric reporter? It's a better way to handle metrics
than through rest api.
Flink supports reporting metrics to external system. You could find the
list of external systems supported here [1].

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter

Thanks,
Biao /'bɪ.aʊ/



On Wed, 18 Sep 2019 at 19:36, Anis Nasir  wrote:

> Hey all,
>
> Is there any client library that we can use to fetch/store the metrics
> expose through flink monitoring rest api ?
>
>
> Regards,
> Anis
>


OutputFlusher线程创建太多

2019-09-18 Thread 李杰
flink 1.7.0
on yarn模式
=
程序运行了大概一周,检测发现YarnTaskExecutorRunner,线程数达彪到了1左右,正常来说应该不到400.
dump线程栈发现大量 的【接近1w】 的 TIMED_WAITING (sleeping) 状态的outputFluster
"OutputFlusher for  -> " #1017018 daemon prio=5 os_prio=0
tid=0x01bb6000 nid=0x894a sleeping[0x2b0f6c42b000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:161)

   Locked ownable synchronizers:
- None

大家碰有到过类似的情况吗?


Re: Flink on yarn use jar on hdfs

2019-09-18 Thread Yang Wang
Hi shengnan,

Sorry for late. I will attach a pr to FLINK-13938 in this week.
If we specify the shared lib(-ysl), all the jars located in the lib
directory of flink client will not be uploaded.
Instead, we will use the hdfs path to set the LocalResource of yarn.
And the visibility of LocalResource will be public. So that the distributed
cache could be shared by all the containers
even different applications. We have used it in production and find that it
could speed up both the jobmanager and taskmanger launch duration.

Thanks,
Yang

Shengnan YU  于2019年9月16日周一 下午3:07写道:

>
> And could you please share your github account with me? I am interested to
> follow you to see how you achieve this feature? Thank you.
> On 9/16/2019 14:44,Yang Wang
>  wrote:
>
> Hi Shengnan,
>
> I think you mean to avoid uploading flink-dist jars in submission every
> time.
> I have created a JIRA[1] to use Yarn public cache to speed up the launch
> duration of JM and TM. After this feature merged, you could submit a flink
> job like below.
>
> ./bin/flink run -d -m yarn-cluster -p 20 -ysl 
> hdfs:///flink/release/flink-1.9.0/lib examples/streaming/WindowJoin.jar
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-13938
>
> Shengnan YU  于2019年9月16日周一 下午2:24写道:
>
>> Hi everyone!
>> I found that everytime I start a flink-yarn application, client will ship
>> flink-uber jar and other dependencies to hdfs and start appMaster. Is there
>> any approaches to locate flink-uber jar and other library jars on hdfs and
>> let only configuration file being shipped. Therefore the yarn can start
>> flink appMaster using jar on a fixed location in hdfs? Thank you very much!
>>
>>


Re: Batch mode with Flink 1.8 unstable?

2019-09-18 Thread Ken Krugler
Hi Till,

I tried out 1.9.0 with my workflow, and I no longer am running into the errors 
I described below, which is great!

Just to recap, this is batch, per-job mode on YARN/EMR.

Though I did run into a new issue, related to my previous problem when reading 
files written via SerializedOutputFormat.

I would always get errors that look like:

2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading from 
split #100 of file 's3://path-to-file/19' from 0 (state 28683/156308, block 
size 67108864)
2019-09-16 20:58:21,397 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code:  CHAIN DataSource (at 
makePreparedDataSet(com.company.MyWorkflow.java:67) 
(com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
java.io.UTFDataFormatException: malformed input around byte 51
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.company.AdText.read(AdText.java:170)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at 
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at 
com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
at 
com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Which would imply (again) an issue with the read block size not being the same 
as what was used to write it.

But I’d run this same data through a different workflow, without any issues.

When I reduced the read parallelism of the failing workflow to match the 
succeeding workflow (was 12, dropped it to 4), the errors went away.

So…don’t know what’s the root issue, but I have a workaround for now.

Though it’s a reproducible problem, which I’d like to use to help solve the 
problem.

Any suggestions for how to debug further?

Thanks,

— Ken

 
> On Jul 1, 2019, at 2:57 AM, Till Rohrmann  wrote:
> 
> Hi Ken,
> 
> in order to further debug your problems it would be helpful if you could 
> share the log files on DEBUG level with us.
> 
> For problem (2), I suspect that it has been caused by Flink releasing TMs too 
> early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. 
> The 1.8.1 release should be released very soonish. It would be great if you 
> could try your program with this version or even the 1.8.1 RC to see whether 
> the problem still occurs. But it could also be caused by using fine grained 
> recovery. So it might be worth a try to disable this feature if you turned it 
> on.
> 
> Thanks a lot!
> 
> Cheers,
> Till
> 
> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu  > wrote:
> Hi Ken again,
> 
> In regard to TimeoutException, I just realized that there is no 
> akka.remote.OversizedPayloadException in your log file. There might be some 
> other reason caused this.
> 1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need to 
> enable printing GC log first.
> 
> 
> Biao Liu mailto:mmyy1...@gmail.com>> 于2019年6月27日周四 
> 上午11:38写道:
> Hi Ken,
> 
> In regard to oversized input splits, it seems to be a rare case beyond my 
> expectation. However it should be fixed definitely since input split can be 
> user-defined. We should not assume it must be small. 
> I agree with Stephan that maybe there is something unexpectedly involved in 
> the input splits.
> And there is also a work-around way to solve this before we fixing it, 
> increasing the limit of RPC message size by explicitly configuring 
> "akka.framesize" in flink-conf.yaml.
> 
> To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm 
> not sure I could catch up the releasing of 1.9. Hope things go well.
> 
> 
> Stephan Ewen mailto:se...@apache.org>> 于2019年6月26日周三 
> 下午10:50写道:
> Hi Ken!
> 
> Sorry to hear you are going through this experience. The major focus on 
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
> 
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by 
> addressing batch specific scheduling / recovery / and shuffle issues.
> 
> Let me go through the issues you found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB 

Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Debasish Ghosh
ok, the above problem was due to some serialization issues which we fixed
by marking some of the things transient. This fixes the serialization
issues .. But now when I try to execute in a Future I hit upon this ..


*java.util.concurrent.ExecutionException: Boxed Error* at
scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

*Caused by:
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
at
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA
https://issues.apache.org/jira/browse/FLINK-10381 which
is still open and talks about a related issue. But we are not submitting
multiple jobs - we are just submitting 1 job but async in a Future. I am
not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh 
wrote:

> I think the issue may not be linked with Future. What happens is when this
> piece of code is executed ..
>
> val rides: DataStream[TaxiRide] =
>   readStream(inTaxiRide)
> .filter { ride ⇒ ride.getIsStart().booleanValue }
> .keyBy("rideId")
>
> val fares: DataStream[TaxiFare] =
>   readStream(inTaxiFare)
> .keyBy("rideId")
>
> val processed: DataStream[TaxiRideFare] =
>   rides
> .connect(fares)
> .flatMap(new EnrichmentFunction)
>
> somehow the ClosureCleaner gets executed as evident from the following
> which tries to serialize Avro data. Is there any way to pass the custom
> avro serializer that I am using ?
>
> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
> serializable. The object probably contains or references non serializable
> fields.
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
> at
> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
> at
> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
> at
> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
> at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
> at 

changing flink/kafka configs for stateful flink streaming applications

2019-09-18 Thread Abrar Sheikh
Hey all,

One of the known things with Spark Stateful Streaming application is that
we cannot alter Spark Configurations or Kafka Configurations after the
first run of the stateful streaming application, this has been explained
well in
https://www.linkedin.com/pulse/upgrading-running-spark-streaming-application-code-changes-prakash/

Is this also something Stateful Flink Application share in common with
Spark?

Thanks,

-- 
Abrar Sheikh


Re: Extending Flink's SQL-Parser

2019-09-18 Thread Rong Rong
Hi Dominik,

To add to Rui's answer. there are other examples I can think of on how to
extend Calcite's DDL syntax is already in Calcite's Server module [1] and
one of our open-sourced project [2]. you might want to check them out.

--
Rong

[1]
https://github.com/apache/calcite/blob/master/server/src/main/codegen/includes/parserImpls.ftl
[2]
https://github.com/uber/AthenaX/blob/master/athenax-vm-compiler/src/codegen/includes/parserImpls.ftl

On Mon, Sep 16, 2019 at 8:28 PM Rui Li  wrote:

> Hi Dominik,
>
> I think you can check "parserImpls.ftl" to find out how Flink extends
> Calcite's original syntax to support features like CREATE TABLE and DROP
> TABLE, and follow those examples to implement your own syntax. It may also
> be helpful to check the pom.xml of flink-sql-parser to see how we use
> javacc plugin to generate the parser code.
> At the moment I don't think there's any tutorials about extending the SQL
> parser because it's quite internal to Flink. But perhaps the following
> answer provides some insights about how to extend Calcite parser in
> general:
> https://stackoverflow.com/questions/44382826/how-to-change-calcites-default-sql-grammar
>
> On Tue, Sep 17, 2019 at 12:16 AM
> dominik.werner.groenin...@student.uni-augsburg.de <
> dominik.werner.groenin...@student.uni-augsburg.de> wrote:
>
>> Hey there,
>>
>>
>>
>> I have to extend Flink's SQL-parser such that it accepts and evaluates
>> select-queries with different syntax.
>>
>> Furthermore I use Eclipse Oxygen with Maven plugin and Flink Release 1.8.
>> 0.
>>
>>
>>
>> What I believe to know:
>>
>> For parsing SQL-queries Flink-Table uses Apache Calcite's SQL-parser.
>> Flink-Table-Planner is the only module that references the Calcite-Core
>> which contains the parser ("Parser.jj" ?).
>>
>> Therefore I want to import Flink-Table-Planner and Calcite-Core as local
>> projects in Eclipse and edit the files "config.fmpp" and "parserImpls.ftl".
>> After that I want to create a new "Parser.jj" file with Apache Freemaker (I
>> assume there are some tutorials?).
>>
>>
>>
>> What I don't know:
>>
>> Is it a promising plan or are there better strategies to extend the
>> parser?
>>
>> I already tried to import Flink-Table-Planner but I got many errors which
>> might refer to a Scala-problem with Eclipse. Do I have to switch to say
>> IntelliJ? Furthermore I'm not yet clear about how exactly I can extend the
>> parser. Are there any manuals/tutorials to teach me adding a new
>> SELECT-syntax? I already came across the parser extension test but it
>> didn't give me the answers I was looking for.
>>
>>
>>
>> Thanks for your help!
>>
>>
>>
>> Regards,
>>
>> Dominik Gröninger
>>
>>
>> 
>>
>
>
> --
> Best regards!
> Rui Li
>


Re: Property based testing

2019-09-18 Thread Aaron Levin
Hey,

I've used ScalaCheck to test flink applications. Basic idea is:

* use ScalaCheck to generate some kind of collection
* use `fromCollection` in `StreamExecutionEnvironment` to create a
`DataStream`
* use `DataStreamUtils.collect` as a sink
* plug my flink logic between the collection source and the collection sink
* make a ScalaCheck property assertion based on the input collection and
output collection.

Possible to wrap all that in a single method in Scala. LMK if you have any
more questions or any of this was not clear!

(note: not sure how to do this in Java).

Best,

Aaron Levin

On Wed, Sep 18, 2019 at 8:36 AM Indraneel R  wrote:

> Hi All,
>
> Is there any property based testing framework for flink like
> 'SparkTestingBase'  for spark?
>
> Would also be useful to know what are some of the standard testing
> practices for data testing for flink pipelines.
>
> regards
> -Indraneel
>


Re: Client for Monitoring API!

2019-09-18 Thread Felipe Gutierrez
yes.

you can use prometheus+Grafana.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
https://felipeogutierrez.blogspot.com/2019/04/monitoring-apache-flink-with-prometheus.html

Felipe

On 2019/09/18 11:36:37, Anis Nasir  wrote:
> Hey all,>
>
> Is there any client library that we can use to fetch/store the metrics>
> expose through flink monitoring rest api ?>
>
>
> Regards,>
> Anis>
>


Re: 关于使用Flink计算TopN的问题

2019-09-18 Thread Jark Wu
Hi,

多谢反馈。 这应该是一个 mistake。 我创建了一个 issue 去跟踪这个问题。 
https://issues.apache.org/jira/browse/FLINK-14119 


> 在 2019年9月18日,16:58,董 加强  写道:
> 
> 大家好:
> 
> 最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
> 
> create stream input table raw_log (
>  country STRING,
>  domain STRING,
>  flux LONG,
>  request LONG,
>  rowtime AS ROWTIME(request, "2 SECOND")
> ) USING kafka (
>  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
>  startingOffsets = earliest, subscribe = "input"
> ) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
>  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
>  topic = "output"
> ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view 
> window_log as
> select
>  TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
>  country,
>  domain,
>  sum(flux) as flux
> from
>  raw_log
> group by
>  TUMBLE(rowtime, INTERVAL '2' SECOND),
>  country,
>  domain; insert into top_n_result
> SELECT
>  *
> FROM
>  (
>SELECT
>  *,
>  ROW_NUMBER() OVER (
>PARTITION BY wStart
>ORDER BY
>  flux desc
>  ) AS row_num
>FROM
>  window_log
>  )
> WHERE
>  row_num <= 10;
> 
>就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 
> 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
> 
> 一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 
> 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
> 
> AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 
> 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?



Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi Nishant,

You should model the query as a join with a time-versioned table [1].
The bad-ips table would be the time-time versioned table [2].
Since it is a time-versioned table, it could even be updated with new IPs.

This type of join will only keep the time-versioned table (the bad-ips in
state) and not the other (high-volume) table.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html

Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for your reply
> I have a continuous stream of kafka coming and static table of badips. I
> wanted to segregate records having bad ip.
>
> So therefore i was joining it. But with that 60 gb memory getting run out
>
> So i used below query.
> Can u please guide me in this regard
>
> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> The query that you wrote is not a time-windowed join.
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>> (or event time) attribute of badips.
>>
>> What exactly are you trying to achieve with the query?
>>
>> Best, Fabian
>>
>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>> nishantgupta1...@gmail.com>:
>>
>>> Hi Team,
>>>
>>> I am running a query for Time Window Join as below
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> Time windowed join, Flink SQL should automatically clear older records, Some
>>> how the query does not clear the heapspace and fails with error after
>>> sometime.
>>>
>>> Can you please let me know what could go wrong, or is it a issue
>>>
>>> Environment File chunks
>>>
>>> --
>>> tables:
>>>   - name: sourceKafka
>>> type: source-table
>>> update-mode: append
>>> connector:
>>>   type: kafka
>>>   version: "universal"
>>>   topic: test-data-flatten
>>>   properties:
>>> - key: zookeeper.connect
>>>   value: x.x.x.x:2181
>>> - key: bootstrap.servers
>>>   value: x.x.x.x:9092
>>> - key: group.id
>>>   value: testgroup
>>> format:
>>>   type: json
>>>   fail-on-missing-field: false
>>>   json-schema: >
>>> {
>>>   type: 'object',
>>>   properties: {
>>> 'source.ip': {
>>>type: 'string'
>>> },
>>> 'source.port': {
>>>type: 'string'
>>> }
>>>   }
>>> }
>>>   derive-schema: false
>>> schema:
>>>   - name: ' source.ip '
>>> type: VARCHAR
>>>   - name: 'source.port'
>>> type: VARCHAR
>>>
>>>   - name: sourceKafkaMalicious
>>> type: sink-table
>>> update-mode: append
>>> connector:
>>>   type: kafka
>>>   version: "universal"
>>>   topic: test-data-mal
>>>   properties:
>>> - key: zookeeper.connect
>>>   value: x.x.x.x:2181
>>> - key: bootstrap.servers
>>>   value: x.x.x.x:9092
>>> - key: group.id
>>>   value: testgroupmal
>>> format:
>>>   type: json
>>>   fail-on-missing-field: false
>>>   json-schema: >
>>> {
>>>   type: 'object',
>>>   properties: {
>>> 'source.ip': {
>>>type: 'string'
>>> },
>>> 'source.port': {
>>>type: 'string'
>>> }
>>>   }
>>> }
>>>   derive-schema: false
>>> schema:
>>>   - name: ' source.ip '
>>> type: VARCHAR
>>>   - name: 'source.port'
>>> type: VARCHAR
>>>
>>>   - name: badips
>>> type: source-table
>>> #update-mode: append
>>> connector:
>>>   type: filesystem
>>>   path: "/home/cyanadmin/ipsum/levels/badips.csv"
>>> format:
>>>   type: csv
>>>   fields:
>>> - name: ip
>>>   type: VARCHAR
>>>   comment-prefix: "#"
>>> schema:
>>>   - name: ip
>>> type: VARCHAR
>>>
>>> execution:
>>>   planner: blink
>>>   type: streaming
>>>   time-characteristic: event-time
>>>   periodic-watermarks-interval: 200
>>>   result-mode: table
>>>   max-table-result-rows: 100
>>>   parallelism: 3
>>>   max-parallelism: 128
>>>   

Property based testing

2019-09-18 Thread Indraneel R
Hi All,

Is there any property based testing framework for flink like
'SparkTestingBase'  for spark?

Would also be useful to know what are some of the standard testing
practices for data testing for flink pipelines.

regards
-Indraneel


Re: Time Window Flink SQL join

2019-09-18 Thread Nishant Gupta
Hi Fabian,

Thanks for your reply
I have a continuous stream of kafka coming and static table of badips. I
wanted to segregate records having bad ip.

So therefore i was joining it. But with that 60 gb memory getting run out

So i used below query.
Can u please guide me in this regard

On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske  wrote:

> Hi,
>
> The query that you wrote is not a time-windowed join.
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
> (or event time) attribute of badips.
>
> What exactly are you trying to achieve with the query?
>
> Best, Fabian
>
> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
> nishantgupta1...@gmail.com>:
>
>> Hi Team,
>>
>> I am running a query for Time Window Join as below
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> Time windowed join, Flink SQL should automatically clear older records, Some
>> how the query does not clear the heapspace and fails with error after
>> sometime.
>>
>> Can you please let me know what could go wrong, or is it a issue
>>
>> Environment File chunks
>>
>> --
>> tables:
>>   - name: sourceKafka
>> type: source-table
>> update-mode: append
>> connector:
>>   type: kafka
>>   version: "universal"
>>   topic: test-data-flatten
>>   properties:
>> - key: zookeeper.connect
>>   value: x.x.x.x:2181
>> - key: bootstrap.servers
>>   value: x.x.x.x:9092
>> - key: group.id
>>   value: testgroup
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'source.ip': {
>>type: 'string'
>> },
>> 'source.port': {
>>type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>> schema:
>>   - name: ' source.ip '
>> type: VARCHAR
>>   - name: 'source.port'
>> type: VARCHAR
>>
>>   - name: sourceKafkaMalicious
>> type: sink-table
>> update-mode: append
>> connector:
>>   type: kafka
>>   version: "universal"
>>   topic: test-data-mal
>>   properties:
>> - key: zookeeper.connect
>>   value: x.x.x.x:2181
>> - key: bootstrap.servers
>>   value: x.x.x.x:9092
>> - key: group.id
>>   value: testgroupmal
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'source.ip': {
>>type: 'string'
>> },
>> 'source.port': {
>>type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>> schema:
>>   - name: ' source.ip '
>> type: VARCHAR
>>   - name: 'source.port'
>> type: VARCHAR
>>
>>   - name: badips
>> type: source-table
>> #update-mode: append
>> connector:
>>   type: filesystem
>>   path: "/home/cyanadmin/ipsum/levels/badips.csv"
>> format:
>>   type: csv
>>   fields:
>> - name: ip
>>   type: VARCHAR
>>   comment-prefix: "#"
>> schema:
>>   - name: ip
>> type: VARCHAR
>>
>> execution:
>>   planner: blink
>>   type: streaming
>>   time-characteristic: event-time
>>   periodic-watermarks-interval: 200
>>   result-mode: table
>>   max-table-result-rows: 100
>>   parallelism: 3
>>   max-parallelism: 128
>>   min-idle-state-retention: 0
>>   max-idle-state-retention: 0
>>   restart-strategy:
>> type: fallback
>>
>> configuration:
>>   table.optimizer.join-reorder-enabled: true
>>   table.exec.spill-compression.enabled: true
>>   table.exec.spill-compression.block-size: 128kb
>>  Properties that describe the cluster to which table programs are
>> submitted to.
>>
>> deployment:
>>   response-timeout: 5000
>>
>>
>> --
>>
>


Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi,

The query that you wrote is not a time-windowed join.

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

The problem is the use of CURRENT_TIMESTAMP instead of a processing time
(or event time) attribute of badips.

What exactly are you trying to achieve with the query?

Best, Fabian

Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Team,
>
> I am running a query for Time Window Join as below
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> Time windowed join, Flink SQL should automatically clear older records, Some
> how the query does not clear the heapspace and fails with error after
> sometime.
>
> Can you please let me know what could go wrong, or is it a issue
>
> Environment File chunks
>
> --
> tables:
>   - name: sourceKafka
> type: source-table
> update-mode: append
> connector:
>   type: kafka
>   version: "universal"
>   topic: test-data-flatten
>   properties:
> - key: zookeeper.connect
>   value: x.x.x.x:2181
> - key: bootstrap.servers
>   value: x.x.x.x:9092
> - key: group.id
>   value: testgroup
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'source.ip': {
>type: 'string'
> },
> 'source.port': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
> schema:
>   - name: ' source.ip '
> type: VARCHAR
>   - name: 'source.port'
> type: VARCHAR
>
>   - name: sourceKafkaMalicious
> type: sink-table
> update-mode: append
> connector:
>   type: kafka
>   version: "universal"
>   topic: test-data-mal
>   properties:
> - key: zookeeper.connect
>   value: x.x.x.x:2181
> - key: bootstrap.servers
>   value: x.x.x.x:9092
> - key: group.id
>   value: testgroupmal
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'source.ip': {
>type: 'string'
> },
> 'source.port': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
> schema:
>   - name: ' source.ip '
> type: VARCHAR
>   - name: 'source.port'
> type: VARCHAR
>
>   - name: badips
> type: source-table
> #update-mode: append
> connector:
>   type: filesystem
>   path: "/home/cyanadmin/ipsum/levels/badips.csv"
> format:
>   type: csv
>   fields:
> - name: ip
>   type: VARCHAR
>   comment-prefix: "#"
> schema:
>   - name: ip
> type: VARCHAR
>
> execution:
>   planner: blink
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 100
>   parallelism: 3
>   max-parallelism: 128
>   min-idle-state-retention: 0
>   max-idle-state-retention: 0
>   restart-strategy:
> type: fallback
>
> configuration:
>   table.optimizer.join-reorder-enabled: true
>   table.exec.spill-compression.enabled: true
>   table.exec.spill-compression.block-size: 128kb
>  Properties that describe the cluster to which table programs are
> submitted to.
>
> deployment:
>   response-timeout: 5000
>
>
> --
>


Time Window Flink SQL join

2019-09-18 Thread Nishant Gupta
Hi Team,

I am running a query for Time Window Join as below

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

Time windowed join, Flink SQL should automatically clear older records, Some
how the query does not clear the heapspace and fails with error after
sometime.

Can you please let me know what could go wrong, or is it a issue

Environment File chunks
--
tables:
  - name: sourceKafka
type: source-table
update-mode: append
connector:
  type: kafka
  version: "universal"
  topic: test-data-flatten
  properties:
- key: zookeeper.connect
  value: x.x.x.x:2181
- key: bootstrap.servers
  value: x.x.x.x:9092
- key: group.id
  value: testgroup
format:
  type: json
  fail-on-missing-field: false
  json-schema: >
{
  type: 'object',
  properties: {
'source.ip': {
   type: 'string'
},
'source.port': {
   type: 'string'
}
  }
}
  derive-schema: false
schema:
  - name: ' source.ip '
type: VARCHAR
  - name: 'source.port'
type: VARCHAR

  - name: sourceKafkaMalicious
type: sink-table
update-mode: append
connector:
  type: kafka
  version: "universal"
  topic: test-data-mal
  properties:
- key: zookeeper.connect
  value: x.x.x.x:2181
- key: bootstrap.servers
  value: x.x.x.x:9092
- key: group.id
  value: testgroupmal
format:
  type: json
  fail-on-missing-field: false
  json-schema: >
{
  type: 'object',
  properties: {
'source.ip': {
   type: 'string'
},
'source.port': {
   type: 'string'
}
  }
}
  derive-schema: false
schema:
  - name: ' source.ip '
type: VARCHAR
  - name: 'source.port'
type: VARCHAR

  - name: badips
type: source-table
#update-mode: append
connector:
  type: filesystem
  path: "/home/cyanadmin/ipsum/levels/badips.csv"
format:
  type: csv
  fields:
- name: ip
  type: VARCHAR
  comment-prefix: "#"
schema:
  - name: ip
type: VARCHAR

execution:
  planner: blink
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  restart-strategy:
type: fallback

configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb
 Properties that describe the cluster to which table programs are submitted
to.

deployment:
  response-timeout: 5000

--


Client for Monitoring API!

2019-09-18 Thread Anis Nasir
Hey all,

Is there any client library that we can use to fetch/store the metrics
expose through flink monitoring rest api ?


Regards,
Anis


Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-18 Thread Tony Wei
Hi Becket,

I have reproduced this problem in our development environment. Below is the
log message with debug level.
Seems that the exception was from broker-3, and I also found other error
code in broker-2 during the time.

There are others INVALID_TXN_STATE error for other transaction id. I just
list one of them. Above log messages only
shows message with `kafka-sink--eba862242e60de7e4744f3307058f865-7's`
substring before `2019-09-18 07:14`.

I didn't see other information to find out why producer tried to make
transaction state from EMPTY to COMMIT, and what
made NOT_COORDINATOR happened. Do you have any thought about what's
happening? Thanks.

*Number of Kafka brokers: 3*
*logging config for kafka:*

> log4j.appender.transactionAppender=org.apache.log4j.RollingFileAppender
>
> log4j.appender.transactionAppender.File=${kafka.logs.dir}/kafka-transaction.log
> log4j.appender.transactionAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.transactionAppender.layout.ConversionPattern=[%d] %p %m
> (%c)%n
> log4j.appender.transactionAppender.MaxFileSize=10MB
> log4j.appender.transactionAppender.MaxBackupIndex=10
> log4j.logger.kafka.coordinator.transaction=DEBUG, transactionAppender
> log4j.additivity.kafka.coordinator.transaction=true
>


*flink-ui*
>
> Timestamp: 2019-09-18, 07:13:43
>


java.lang.RuntimeException: Error while confirming checkpoint
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of
> transactions failed, logging first encountered failure
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
> ... 5 more
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
> producer attempted a transactional operation in an invalid state
>


*broker-3*
>
> [2019-09-18 07:13:43,768] DEBUG [TransactionCoordinator id=3]
> TransactionalId: blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
> received transaction marker result to send: COMMIT
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:43,769] DEBUG [TransactionCoordinator id=3] Aborting
> append of COMMIT to transaction log with coordinator and returning
> INVALID_TXN_STATE error to client for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3]
> TransactionalId: blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's state is Empty, but
> received transaction marker result to send: COMMIT
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:45,896] DEBUG [TransactionCoordinator id=3] Aborting
> append of COMMIT to transaction log with coordinator and returning
> INVALID_TXN_STATE error to client for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7's EndTransaction request
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2019-09-18 07:13:46,840] DEBUG [Transaction State Manager 3]: Updating
> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, producerEpoch=4,
> txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568790826831) with
> coordinator epoch 4 for blacklist -> Sink:
> kafka-sink--eba862242e60de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
>

*broker-2*

> [2019-09-18 06:45:26,324] DEBUG [Transaction State Manager 2]: Updating
> blacklist -> Sink: kafka-sink--eba862242e60de7e4744f3307058f865-7's
> transaction state to TxnTransitMetadata(producerId=7019, produc
> erEpoch=0, txnTimeoutMs=540, txnState=Empty, topicPartitions=Set(),
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1568789126318) with
> coordinator epoch 0 for blacklist -> Sink: kafka-sink--eba862242e6
> 0de7e4744f3307058f865-7 succeeded
> (kafka.coordinator.transaction.TransactionStateManager)
> [2019-09-18 06:54:27,981] DEBUG [Transaction State Manager 2]: Updating
> 

Re: Difference between data stream window function and cep within

2019-09-18 Thread Joshua Fan
Hi Dian

Thank you for your explanation.
After have a look at the source code, the cep within just executes by a
time interval according to each state.
Thank you.

Yours sincerely
Joshua

On Wed, Sep 18, 2019 at 9:41 AM Dian Fu  wrote:

> Hi Joshua,
>
> There is no tumbling/sliding window underlying the cep within
> implementation.
>
> The difference between datastream window and cep within is that:
> 1) Regarding to datastream window, the window is unified for all the
> elements (You can think that the window already exists before the input
> elements come). For example, for sliding window: (window size: 60s, slide
> size: 10s), then the windows will be [0s, 60s], [10s, 70s], [20s, 80s],
> etc. When the input elements come, they are put into the windows they
> belong to.
> 2) Regarding to cep within, it defines the maximum time interval for an
> event sequence to match the pattern. So a unified window is not suitable
> for this requirement. Regarding to the underlying implementation, for each
> matching/partial-matching sequence, the time interval between the first
> element and the last element of the sequence will be checked against the
> within interval. You can refer to [1] for details.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/459fd929399ad6c80535255eefa278564ec33683/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java#L251
>
>
> 在 2019年9月17日,下午7:47,Joshua Fan  写道:
>
> Hi All,
>
> I'd like to know the difference between data stream window function and
> cep within, I googled this issue but found no useful information.
>
> Below the cep within, is there a tumbling window or sliding window or just
> a process function?
>
> Your explanation will be truly appreciated.
>
> Yours sincerely
>
> Joshua
>
>
>


关于使用Flink计算TopN的问题

2019-09-18 Thread 董 加强
大家好:

最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式

create stream input table raw_log (
  country STRING,
  domain STRING,
  flux LONG,
  request LONG,
  rowtime AS ROWTIME(request, "2 SECOND")
) USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  startingOffsets = earliest, subscribe = "input"
) ROW FORMAT JSON; create stream output table top_n_result USING kafka (
  kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}",
  topic = "output"
) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log 
as
select
  TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart,
  country,
  domain,
  sum(flux) as flux
from
  raw_log
group by
  TUMBLE(rowtime, INTERVAL '2' SECOND),
  country,
  domain; insert into top_n_result
SELECT
  *
FROM
  (
SELECT
  *,
  ROW_NUMBER() OVER (
PARTITION BY wStart
ORDER BY
  flux desc
  ) AS row_num
FROM
  window_log
  )
WHERE
  row_num <= 10;

就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 
在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了

一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 
后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在

AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 
能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?


Running flink examples

2019-09-18 Thread RAMALINGESWARA RAO THOTTEMPUDI
 Hi Sir,

I am trying to run the flink programs particularl Pagerank.

I have used the following command :

./bin/flink run -d ./examples/batch/PageRank.jar --input /path/to/input

It is running but it is showing only 15 elements ranking for my data. But I 
need to find the ranking of all elements of my data.
Because the original program  is running only for fixed number of iterations 
which is 15. How can I modify to run for full data elements.

I have to change the value of fixed number of iterations.



Thanking You,

TR RAO


Re: Window metadata removal

2019-09-18 Thread gil bl
Hi Fabian,  Thank you for your reply. I'm not sure my question was clear enough so I'll try to explain our scenario:We are working in “event time” mode.We want to handle ‘late data’ up to last X days (for example last 7 days)For each incoming event:The event is being aggregated using window function.When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.we are not using the default trigger.We expect the flow above to result in fragmented data, i.e. several outputs with the same  which aggregate different sets of events. We encounter the following problem:Since we have a huge number of different , the metadata (WindowOperator, InternalTimer) is being kept in memory until the end of ‘allowed lateness’ period. This causes our job to run out of memory.Here is a calculation of the required memory consumption only for the window metadata -Metadata size for each  is at least 64 bytes.If we have 200,000,000  per day and the allowed lateness is set to 7 days:200,000,000 * 64 * 7 = ~83GB For the scenario above the window metadata is useless.Is there a possibility to keep using window API, set allowed lateness and not keep the window metadata until the end of allowed lateness period?(maybe as a new feature ?)05.09.2019, 13:04, "Fabian Hueske" :Hi,A window needs to keep the data as long as it expects new data.This is clearly the case before the end time of the window was reached. If my window ends at 12:30, I want to wait (at least) until 12:30 before I remove any data, right?In case you expect some data to be late, you can configure allowedLateness. Let's say, we configure allowedLateness of 10 minutes. In that case, Flink would keep the metadata of the window that closes at 12:30 until 12:40. The data is kept to be able to update the result of the window until allowedLateness has passed. If we for example receive a late record at 12:38, we can still update the result of the window because we kept all required data.If you don't need allowedLateness, don't configure it (the default is 0).Best, FabianAm Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl :Hi, I'm interested in why metadata like WindowOperator and InternalTimer are being kept for windowSize + allowedLateness period per each pane.What is the purpose of keeping this data if no new events are expected to enter the pane? Is there any way this metadata can be released earlier?