Re: Flink Task Allocation on Yarn

2018-11-06 Thread vino yang
Hi Marvin,

Can you tell me why you want to do this?

Thanks, vino.

Marvin777  于2018年10月30日周二 上午10:24写道:

> Hi vino,
>
> The current generation environment is on yarn. We do not want to increase
> the operation and maintenance cost of the Standalone mode.
>
> Is there any other way to make better use of the resources of the yarn
> cluster, try to allocate tasks to containers on different nodes.
>
> Thanks, Marvin.
>
> vino yang  于2018年10月29日周一 下午3:04写道:
>
>> Hi Marvin,
>>
>> YARN is a resource management and scheduling framework.
>> When you run Flink on YARN, Flink will hand over the container's
>> scheduling tasks to YARN.
>> This is also the reason why YARN is used.
>> If you want to control the start and stop of TM, then I recommend you use
>> standalone mode and set the number of slots per node to 1.
>>
>> Thanks, vino.
>>
>> Marvin777  于2018年10月29日周一 上午10:18写道:
>>
>>> Hi all,
>>>
>>> In the mode of on yarn, a node may contain more than one container,  is
>>> there a scheme for assigning tasks to different nodes.
>>>
>>> the version is 1.4.2
>>>
>>> Thanks for your assistance.
>>>
>>


Re: Live configuration change

2018-11-06 Thread Ning Shi


> On Nov 6, 2018, at 4:22 PM, Elias Levy  wrote:
> 
> Also note that there is a pending PR to allow the Cassandra sink to back 
> pressure, so that the cluster does not get overwhelmed.

Yes, I’ve been following the development on that pull request. Unfortunately, 
we have to go live very soon so we can’t wait to leverage that, but it’s 
definitely a very nice feature to have.

Thanks,

—
Ning

Re: AvroInputFormat Serialisation Issue

2018-11-06 Thread Vinay Patil
Hi,

I am facing a similar issue today with Flink 1.6.0 - AvroOutputFormat

AvroOutputFormat tuple2AvroOutputFormat = new
AvroOutputFormat<>(
new Path(""), GenericRecord.class);

testDataSet
.map(new GenerateGenericRecord())
.returns(AvroTypeInfo.of(GenericRecord.class))
.output(tuple2AvroOutputFormat);

Following is the exception (I have enabled forceAvro config , not sure why
it still goes to Kyro Serializer)

com.esotericsoftware.kryo.KryoException: Error constructing instance of
class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at 
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.create(CollectionSerializer.java:89)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:93)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

Please let me know if there is a fix for this issue as I have not faced this
problem for DataStreams.

Regards,
Vinay Patil




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Per job cluster doesn't shut down after the job is canceled

2018-11-06 Thread Paul Lam
Hi, 

I’m using Flink 1.5.3, and I’ve seen several times that the detached YARN 
cluster doesn’t shut down after the job is canceled successfully. The only 
errors I found in jobmanager’s log are as below (the second one appears 
multiple times):

```
2018-11-07 09:48:38,663 WARN  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Error while 
notifying JobStatusListener
java.lang.IllegalStateException: Incremented the completed number of 
checkpoints without incrementing the in progress checkpoints before.
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.incrementFailedCheckpoints(CheckpointStatsCounts.java:165)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.reportFailedCheckpoint(CheckpointStatsTracker.java:270)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.access$100(CheckpointStatsTracker.java:55)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$PendingCheckpointStatsCallback.reportFailedCheckpoint(CheckpointStatsTracker.java:314)
at 
org.apache.flink.runtime.checkpoint.PendingCheckpointStats.reportFailedCheckpoint(PendingCheckpointStats.java:184)
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.reportFailedCheckpoint(PendingCheckpoint.java:517)
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:454)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1200)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatusChange(ExecutionGraph.java:1713)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1370)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1354)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.cancel(ExecutionGraph.java:1000)
at 
org.apache.flink.runtime.jobmaster.JobMaster.cancel(JobMaster.java:389)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2018-11-07 09:54:52,420 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation 
error: Unhandled exception.
java.lang.IllegalArgumentException: Negative number of in progress checkpoints
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
at 
org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
at 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
at 
org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at 

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-06 Thread yinhua.dai
Hi Piotr,

Can you elaborate more on the solution with the custom operator?
I don't think there will be any records from the SQL query if no input data
in coming in within the time window even if we convert the result to a
datastream.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Live configuration change

2018-11-06 Thread Elias Levy
Also note that there is a pending PR to allow the Cassandra sink to back
pressure, so that the cluster does not get overwhelmed.

On Tue, Nov 6, 2018 at 12:46 PM Ning Shi  wrote:

> > for rate limiting, would quota at Kafka brokers help?
>
> Thanks, Steven. This looks very promising. I'll try it out.
>
> --
> Ning
>


Re: Live configuration change

2018-11-06 Thread Ning Shi
> for rate limiting, would quota at Kafka brokers help?

Thanks, Steven. This looks very promising. I'll try it out.

--
Ning


Re: Live configuration change

2018-11-06 Thread Steven Wu
for rate limiting, would quota at Kafka brokers help?

On Tue, Nov 6, 2018 at 10:29 AM Ning Shi  wrote:

> On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote:
> > Ning can you provide another example except for rate limiting?
>
> Our main use case and concern is rate limiting because without it we
> could potentially overwhelm downstream systems (Cassandra) when the job
> plays catch up or replay events from Kafka. The consequence of that is
> that exceptions will be thrown in the Cassandra sink causing the whole
> job to restart and end up in the same situation over and over.
>
> With that said, we do have other use cases such as doing canary
> deployment. We'd like to start processing events for a subset of users,
> then expand it to all if things look good. Without live configuration,
> we have to take a savepoint and restart the job, which will cause the
> job to play catch up at the beginning, potentially overwhelming
> downstream system without rate limiting.
>
> Hope the use cases described above clarifies.
>
> Thanks,
>
> --
> Ning
>


Re: Live configuration change

2018-11-06 Thread Ning Shi
On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote:
> Ning can you provide another example except for rate limiting?

Our main use case and concern is rate limiting because without it we
could potentially overwhelm downstream systems (Cassandra) when the job
plays catch up or replay events from Kafka. The consequence of that is
that exceptions will be thrown in the Cassandra sink causing the whole
job to restart and end up in the same situation over and over.

With that said, we do have other use cases such as doing canary
deployment. We'd like to start processing events for a subset of users,
then expand it to all if things look good. Without live configuration,
we have to take a savepoint and restart the job, which will cause the
job to play catch up at the beginning, potentially overwhelming
downstream system without rate limiting.

Hope the use cases described above clarifies.

Thanks,

--
Ning


Flink Web UI does not show specific exception messages when job submission fails

2018-11-06 Thread Luis Gustavo Oliveira Silva
Hello,

I was using Flink 1.4.2 and when submiting jobs through the Web UI, I could
see exceptions that would help me debug jobs, such as:

We're sorry, something went wrong. The server responded with:
>
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkException: Could not run the jar.
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>   at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>   at java.util.concurrent.FutureTask.run(Unknown Source)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
>  Source)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>   ... 9 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>   at 
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>   ... 8 more
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "." at line 3, column 4.
> Was expecting one of:
> 
> "ORDER" ...
> "LIMIT" ...
> "OFFSET" ...
> "FETCH" ...
> "FROM" ...
> "," ...
> "UNION" ...
> "INTERSECT" ...
> "EXCEPT" ...
> "MINUS" ...
>
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:81)
>   at 
> org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:558)
>   at 
> com.stone.default.rule.Sandbox3$.delayedEndpoint$com$stone$default$rule$Sandbox3$1(Sandbox.scala:112)
>   at 
> com.stone.default.rule.Sandbox3$delayedInit$body.apply(Sandbox.scala:93)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at com.stone.default.rule.Sandbox3$.main(Sandbox.scala:93)
>   at com.stone.default.rule.Sandbox3.main(Sandbox.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>   at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>   at java.lang.reflect.Method.invoke(Unknown Source)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   ... 13 more
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
> at line 3, column 4.
> Was expecting one of:
> 
> "ORDER" ...
> "LIMIT" ...
> "OFFSET" ...
> "FETCH" ...
> "FROM" ...
> "," ...
> "UNION" ...
> "INTERSECT" ...
> "EXCEPT" ...
> "MINUS" ...
>
>   at 
> org.apache.calcite.sql.parser.impl.SqlParserImpl.convertException(SqlParserImpl.java:350)
>   at 
> org.apache.calcite.sql.parser.impl.SqlParserImpl.normalizeException(SqlParserImpl.java:131)
>   at 
> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:138)
>   at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:163)
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:77)
>   ... 30 more
> Caused by: org.apache.calcite.sql.parser.impl.ParseException: Encountered "." 
> at line 3, column 4.
> Was expecting one of:
> 
> "ORDER" ...
> "LIMIT" ...
> "OFFSET" ...
> "FETCH" ...
> "FROM" ...
> "," ...
> "UNION" ...
> "INTERSECT" ...
> "EXCEPT" ...
> "MINUS" ...
>
>   at 
> org.apache.calcite.sql.parser.impl.SqlParserImpl.generateParseException(SqlParserImpl.java:22557)
>   at 
> 

Re: Live configuration change

2018-11-06 Thread Nicos Maris
Ning can you provide another example except for rate limiting?

On Tue, Nov 6, 2018, 6:20 PM Piotr Nowojski  Hi,
>
> Sorry but none that I’m aware of. As far as I know, the only way to
> dynamically configure Kafka source would be for you to copy and modify it’s
> code.
>
> Piotrek
>
> > On 6 Nov 2018, at 15:19, Ning Shi  wrote:
> >
> > In the job I'm implementing, there are a couple of configuration
> > variables that I wnat to change at runtime, such as rate limit at the
> > Kafka source. I know it's possible to use a control stream and join it
> > with the normal stream to configure things in certain operators, but
> > this doesn't work for the source. Is there any other way to configure
> > settings at runtime?
> >
> > Thanks,
> >
> > --
> > Ning
>
>


Re: Understanding checkpoint behavior

2018-11-06 Thread PranjalChauhan
Thank you for your response Piotr. I plan to upgrade to Flink 1.5.x early
next year.

Two follow-up questions for now. 

1. 
" When operator snapshots are taken, there are two parts: the synchronous
and the asynchronous parts. "
I understand that when the operator snapshot is being taken, the processing
of that operator is stopped as taking this snapshot is synchronous part. Is
there any other synchronous part in the snapshot / checkpoint process?


2. 
Based on the test I mentioned above, my understanding is that for a window
operator, when all events that belongs to checkpoint N and the checkpoint
barrier N are received by window operator (but pending for window to be
triggered), then checkpoint barrier N will be immediately emitted to the
sink operator (so snapshot can be completed) while the events are still
pending to be evaluated by window operator.

Can you please confirm my understanding as I was initially confused by the
following second statement (emits all pending outgoing records) under
Barriers section in this doc
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html
?

"When an intermediate operator has received a barrier for snapshot n from
all of its input streams, it emits itself a barrier for snapshot n into all
of its outgoing streams."

" Once the last stream has received barrier n, the operator emits all
pending outgoing records, and then emits snapshot n barriers itself. "

 Thanks,
Pranjal



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using FlinkKinesisConsumer through a proxy

2018-11-06 Thread Vijay Balakrishnan
Hi Gordon,
This still didn't work :(

Tried a few combinations with:
kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyDomain", "...");

inesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyHost", "http://.com;);

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyPort", "911");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyUsername", "...");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"proxyPassword", "..");

kinesisConsumerConfig.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX  +
"nonProxyHosts", "


How does the FlinkKinesisProducer work so seamlessly through a proxy ?
TIA,
Vijay

On Thu, Oct 4, 2018 at 6:41 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Since Flink 1.5, you should be able to set all available configurations on
> the ClientConfiguration through the consumer Properties (see FLINK-9188
> [1]).
>
> The way to do that would be to prefix the configuration you want to set
> with "aws.clientconfig" and add that to the properties, as such:
>
> ```
> Properties kinesisConsumerProps = new Properties();
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyHost", ...);
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyPort", ...);
> kinesisConsumerProps.setProperty("aws.clientconfig.proxyUsert", ...);
> ...
> ```
>
> Could you try that out and see if it works for you?
>
> I've also realized that this feature isn't documented very well, and have
> opened a ticket for that [2].
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-9188
> [2] https://issues.apache.org/jira/browse/FLINK-10492
>
> On Thu, Oct 4, 2018, 7:57 PM Aljoscha Krettek  wrote:
>
>> Hi,
>>
>> I'm looping in Gordon and Thomas, they might have some idea about how to
>> resolve this.
>>
>> Best,
>> Aljoscha
>>
>> On 3. Oct 2018, at 17:29, Vijay Balakrishnan  wrote:
>>
>> I have been trying with all variations  to no avail of java
>> -Dhttp.nonProxyHosts=..  -Dhttps.proxyHost=http://...
>> -Dhttps.proxyPort=911 -Dhttps.proxyUser= -Dhttps.proxyPassword=..
>> -Dhttp.proxyHost=http://.. -Dhttp.proxyPort=911 -Dhttp.proxyUser=...
>> -Dhttp.proxyPassword=... -jar .. after looking at the code in
>> com.amazonaws.ClientConfiguration
>>
>> On Tue, Oct 2, 2018 at 3:49 PM Vijay Balakrishnan 
>> wrote:
>>
>>> HI,
>>> How do I use FlinkKinesisConsumer using the Properties through a proxy ?
>>> Getting a Connection issue through the proxy.
>>> Works outside the proxy.
>>>
>>> Properties kinesisConsumerConfig = new Properties();
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_REGION,
>>> region);
>>>
>>> if (local) {
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>>> accessKey);
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>>> secretKey);
>>> } else {
>>>
>>> kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
>>> "AUTO");
>>> }
>>>
>>> //only for Consumer
>>>
>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
>>> "1");
>>>
>>> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
>>> "2000");
>>> FlinkKinesisConsumer>
>>> kinesisConsumer = new FlinkKinesisConsumer<>(
>>> "kinesisTopicRead", new Tuple2KinesisSchema(),
>>> kinesisConsumerConfig);
>>> TIA
>>>
>>
>>


Re: Parallelize an incoming stream into 5 streams with the same data

2018-11-06 Thread Vijay Balakrishnan
Cool, thanks! Hequn. I will try that approach.

Vijay

On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng  wrote:

> Hi Vijay,
>
> > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
> operation on the KeyedStream and then perform group operation on the
> resultant set to get total count etc.
>
> From your description, I think you can perform a TumblingEventTimeWindow
> first, something looks like:
>
>> // tumbling processing-time windows
>> input
>> .keyBy()
>> .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>> .();
>
> then, you can perform a windowAll after the TumblingEventTimeWindow to get
> the final total count.
>
> Best,
> Hequn
>
>
>
> On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan 
> wrote:
>
>> Thanks,Hequn.
>> If I have to do a TumblingWindow operation like:
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>>
>> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>>
>> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow 
>> operation on the KeyedStream
>>
>> and then perform group operation on the resultant set to get total count etc.
>>
>> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>>
>>
>> .keyBy(*d._1,d._2*)
>> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>>
>> OR
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost 
>> in the next step:
>>
>> .keyBy(*d._1,d._2*)
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng  wrote:
>>
>>> Hi Vijay,
>>>
>>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>>> `inputStream`.
>>> While option 2 replicate all data to each task and option 3 split data
>>> into smaller groups without duplication.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan 
>>> wrote:
>>>
 Hi,
 I need to broadcast/parallelize an incoming stream(inputStream) into 5
 streams with the same data. Each stream is keyed by different keys to do
 various grouping operations on the set.

 Do I just use inputStream.keyBy(5 diff keys) and then just use the
 DataStream to perform windowing/grouping operations ?

 *DataStream inputStream= ...*
 *DataStream  keyBy1 = inputStream.keyBy((d) -> d._1);*
 *DataStream  keyBy2 = inputStream.keyBy((d) -> d._2);*

 *DataStream out1Stream = keyBy1.flatMap(new Key1Function());// do
 windowing/grouping operations in this function*
 *DataStream out2Stream = keyBy2.flatMap(new Key2Function());// do
 windowing/grouping operations in this function*

 out1Stream.print();
 out2Stream.addSink(new Out2Sink());

 Will this work ?

 Or do I use the keyBy Stream with a broadcast function like this:

 *BroadcastStream broadCastStream = inputStream.broadcast(..);*
 *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
 * .process(new KeyedBroadcastProcessFunction...)*

 *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
 * .process(new KeyedBroadcastProcessFunction...)*

 Or do I need to use split:

 *SplitStream source = inputStream.split(new MyOutputSelector());*
 *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
 source.select("").flatMap(new Key2Function()).addSink(out2Sink);


 static final class MyOutputSelector implements OutputSelector {
 List outputs = new ArrayList();
 public Iterable select(Long value) {
 outputs.add("");
 return outputs;
 }
 }
 TIA,

>>>


Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-06 Thread Piotr Nowojski
Hi,

You might come up with some magical self join that could do the trick - 
join/window join the the aggregation result with self and then aggregate it 
again. I don’t know if that’s possible (probably you would need to write custom 
aggregate function) and would be inefficient. It will be easier to convert 
result of your SQL query into a DataStream and process it with a simple/custom 
DataStream operator.

Piotrek

> On 5 Nov 2018, at 10:17, yinhua.dai  wrote:
> 
> We have a requirement that always want to trigger a calculation on a timer
> basis e.g. every 1 minute.
> 
> *If there are records come in flink during the time window then calculate it
> with the normal way, i.e. aggregate for each record and getResult() at end
> of the time window.*
> 
> *If there are no records come in flink during the time window, then send the
> last calculated result.*
> 
> I know that Flink will not trigger the calculation in the second case(when
> no records come in the system during the time window), if there a solution
> for me in Flink SQL?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



[ANNOUNCE] Weekly community update #45

2018-11-06 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #45. Please post any news and
updates you want to share with the community to this thread.

# First release candidate for Flink 1.7.0

The community has published the first release candidate for Flink 1.7.0
[0]. Please help the community by giving the RC some exposure and report
any problems you might encounter.

# External shuffle service

Zhijiang started a proposal to add support for an external shuffle service
to Flink [1]. Please join the discussion to learn more.

# Flink operators for Kubernetes

Anand from Lyft shared his work on a Flink Kubernetes operator with the
community. If you want to learn more visit this document [2].

# Flink SQL DDL Design

Shuyi kicked off a discussion about adding support for SQL DDL to Flink
[3]. Chime in if you want to voice your opinion.

# Enhancing flink scheduler by implementing blacklist mechanism

Yingjie proposed to add a blacklisting mechanism to Flink [4]. It will
allow to black list unreliable TaskManagers which won't be used for further
scheduling. That way, the job execution will be more reliable.

# Flip-23: Model serving

Boris published his work on the model serving [5] library on Github [6].
Check it out!

# Flink web UI based on Angular 7

Yadong shared a web UI used at Alibaba with the community [7]. It is a
rework of the existing web UI based on Angular 7.

# Flip-27: Refactor source interface

Aljoscha started discussing the design of the new source interface [8].

# Enhance Table API functionality

Xiaowei and Jincheng started discussing potential improvements to Flink's
Table API and streaming SQL [9, 10]. If you want to learn more about the
Table API and streaming SQL's future, then join this discussion.

[0]
https://lists.apache.org/thread.html/5383a813b21b67655f1982a48a5d131b213596c20343954f9ef53209@%3Cdev.flink.apache.org%3E
[1]
https://lists.apache.org/thread.html/a64497d24b839a4c84dc29ddcf8bf43a34f13984ffa79a9bd64e858c@%3Cdev.flink.apache.org%3E
[2]
https://docs.google.com/document/d/1_AITTq0fay-NwUIqNfqN0pum-erOmS_4kYZnlMxw6_o/edit?usp=gmail#heading=h.ge413lh374xj
[3]
https://lists.apache.org/thread.html/cb696438e9bf7ff2a44953c438135db3b68ff7f96cff59847df0867d@%3Cdev.flink.apache.org%3E
[4]
https://lists.apache.org/thread.html/c0a2057f8171e75734c15a3a45ca5177fce1e04a19b9a02ba064706c@%3Cdev.flink.apache.org%3E
[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
[6] https://github.com/FlinkML/flink-modelServer
[7] https://github.com/vthinkxie/flink-runtime-web
[8]
https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E
[9]
https://lists.apache.org/thread.html/a75f5d0a938333503a0f1881f800d37ba0ec662b44624d4be9c6fdd9@%3Cdev.flink.apache.org%3E
[10]
https://lists.apache.org/thread.html/881b34fe79991870c099132b4723dde882cffcfff8e9a1f5bbe92bee@%3Cdev.flink.apache.org%3E


Cheers,
Till


Kubernetes Job Cluster - Checkpointing with Parallelism > 1

2018-11-06 Thread Thad Truman
Hi all,

We are trying to configure checkpointing (RocksDb) for flink job clusters in 
k8s.  As described 
here
 we have a parallelism value that is used as the -Dparallelism.default arg in 
the job manager 
template
 as well as the replicas value in the task manager 
template.
  For jobs where the parallelism value is set to 1 checkpointing works great.  
But when we set the parallelism value to anything > 1 we get the below error:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 4, slots allocated: 1
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:534)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)


Any ideas on how we can remediate this?

Thanks,

Thad Truman | Software Engineer | Neovest, Inc.
A:
T:
E:

1145 S 800 E, Ste 310 Orem, UT 84097
+1 801 900 2480
ttru...@neovest.com


Support Desk: supp...@neovest.com / +1 800 433 4276



[Alt logo for white backgrounds (Grey Flat)2]

This email is confidential and subject to important disclaimers and conditions 
including on offers for purchase or sale of securities accuracy and 
completeness of information viruses confidentiality legal privilege and legal 
entity disclaimers available at 
www.neovest.com/disclosures.html






Re: Understanding checkpoint behavior

2018-11-06 Thread Piotr Nowojski
Hi,

Checkpoint duration sync, that’s only the time taken for the “synchronous” part 
of taking a snapshot of your operator. Your 11m time probably comes from the 
fact that before this snapshot, checkpoint barrier was stuck somewhere in your 
pipeline for that amount of time processing some record or bunch of records.

If you write a simple function that only performs `Thread.sleep(new 
Random().randomInt(360))` and nothing else, your checkpoints will be taking 
random amount of time, since snapshots can not be taken while your function is 
also executing some code. You can read about some of those concepts in the 
documentation

https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html

Piotrek

Btw, Flink 1.2.1 is very old and not supported anymore version. One reason to 
upgrade are improvements in the network stack in Flink 1.5.x, which were in 
part aiming to reduce checkpoint duration.

> On 5 Nov 2018, at 21:33, PranjalChauhan  wrote:
> 
> Hi,
> 
> I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
> understand how checkpoints actually work when Window operator is processing
> events.
> 
> My pipeline has the following flow where each operator's parallelism is 1.
> source -> flatmap -> tumbling window -> sink
> In this pipeline, I had configured the window to be evaluated every 1 hour
> (3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
> timeout was set to 1 hour as I wanted the checkpoints to complete.
> 
> In my window function, the job makes https call to another service so window
> function may take some time to evaluate/process all events.
> 
> Please refer the following image. In this case, the window was triggered at
> 23:00:00. Checkpoint 12 was triggered soon after that and I notice that
> checkpoint 12 takes long time to complete (compared to other checkpoints
> when window function is not processing events).
> 
>  
> 
> Following images shows checkpoint 12 details of window & sink operators.
> 
>  
> 
>  
> 
> I see that the time spent for checkpoint was actually just 5 ms & 8 ms
> (checkpoint duration sync) for window & sink operators. However, End to End
> Duration for checkpoint was 11m 12s for both window & sink operator.
> 
> Is this expected behavior? If yes, do you have any suggestion to reduce the
> end to end checkpoint duration?
> 
> Please let me know if any more information is needed.
> 
> Thanks.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Live configuration change

2018-11-06 Thread Piotr Nowojski
Hi,

Sorry but none that I’m aware of. As far as I know, the only way to dynamically 
configure Kafka source would be for you to copy and modify it’s code.

Piotrek

> On 6 Nov 2018, at 15:19, Ning Shi  wrote:
> 
> In the job I'm implementing, there are a couple of configuration
> variables that I wnat to change at runtime, such as rate limit at the
> Kafka source. I know it's possible to use a control stream and join it
> with the normal stream to configure things in certain operators, but
> this doesn't work for the source. Is there any other way to configure
> settings at runtime?
> 
> Thanks,
> 
> --
> Ning



Run a Flink job: REST/ binary client

2018-11-06 Thread Flavio Pompermaier
Hi to all,
I'm using Flink 1.3.2. If executing a job using bin/flink run everything
goes well.
If executing using REST service of job manager (/jars:jarid/run) the job
writes to the sink but fails to return on env.execute() and all the code
after it is not executed.

Is this a known issue? Was it resolved in Flink 1.6.2?

Best,
Flavio


Re: Flink CEP Watermark Exception

2018-11-06 Thread Austin Cawley-Edwards
Hi Dawid,

Just back in the office. The platform we run on recently announced Flink
1.6.0 support, so we upgraded and haven't seen this problem arise again
yet! We believe it could have been the `equals` method falsely matching
different records in rare instances, though the upgrade to Flink 1.6.0
seemed to minimize those instances.

Thank you for your help,
Austin

On Fri, Nov 2, 2018 at 5:02 AM Dawid Wysakowicz 
wrote:

> Hi Austin,
>
> Could you provide jobmanagers and taksmanagers logs for a failed run? The
> exception you've posted is thrown during processing messages, rather than
> during restoring, but you said it failed to restore checkpoint, how come it
> processes messages? Could you also describe exact conditions step by step
> when the "IllegalStateException: Could not find previous entry with key"
> happens?
>
> The first two issues regarding CEP you've linked concern very old Flink
> version (1.0.3), CEP library was heavily reworked since then and I would
> not look for any similarities in those cases.
>
> Best,
>
> Dawid
> On 01/11/2018 14:24, Austin Cawley-Edwards wrote:
>
> Hi Dawid,
>
> Thank you for your reply. I'm out for the next few days, so I hope you
> don't mind me cc'ing my team in here. We all really appreciate you and the
> rest of the people monitoring the mailing list.
>
>
> We've only seen this SharedBuffer problem in production, after sending
> around 20 GB of data through. In the Flink UI, we see the checkpoint status
> as:
>
>
> *Checkpoint failed: Checkpoint Coordinator is suspending. *
>
> It then tries to restore the last previously succeeded checkpoint, but
> cannot and throws the SharedBuffer exception. Our state size is around
> 200MB when it fails.
>
> Unfortunately, the platform we are running our cluster on only supports up
> to Flink 1.5. We will continue trying to find a reproducible example for
> you. I have found a few other people with a similar problem (attached at
> the bottom), but none seem to have been resolved.
>
> Our CEP setup looks generally like this:
> Pattern pattern = Pattern.begin("alertOne")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(AlertEvent event) {
> return event.level > 0;
> }
> })
> .next("alertTwo").subtype(AlertEvent.class)
> .where(new IterativeCondition() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable previousEvents
> = ctx.getEventsForPattern("alertOne");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }
> })
> .next("alertThree").subtype(AlertEvent.class)
> .where(new IterativeCondition() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable previousEvents =
> ctx.getEventsForPattern("alertTwo");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }});
> PatternStream alertPatternStream =
> CEP.pattern(alertEvents, pattern);
> DataStream confirmedAlerts = alertPatternStream
> .select(new PatternSelectFunction {
> private static final long serialVersionUID = 1L;
> @Override
> public Alert select(Map> patternIn) {
> List alertEventList = new ArrayList<>();
> // Create an alert composed of three escalating events
> alertEventList.add(patternIn.get("alertOne").get(0));
> alertEventList.add(patternIn.get("alertTwo").get(0));
> alertEventList.add(patternIn.get("alertThree").get(0));
> Alert confirmedAlert = new Alert(alertEventList);
> return confirmedAlert;
> }
> })
> .uid("confirmedAlerts")
> .returns(Alert.class)
> .keyBy("id");
>
> Once again thank you,
> Austin
>
>
> -
> https://stackoverflow.com/questions/36917569/flink-streaming-cep-could-not-find-previous-shared-buffer-entry-with-key
> -
> https://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAC27z=nd0sx_ogbh0vuvzwetcke2t40tbxht5huh8uyzdte...@mail.gmail.com%3E
>
> -
> https://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAGr9p8CzT3=cr+cOas=3k0becmiybfv+kt1fxv_rf8xabtu...@mail.gmail.com%3E
>
>
> On Thu, Nov 1, 2018 at 4:44 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Austin,
>>
>> Could you elaborate a bit more what do you mean by "after a checkpoint
>> fails", what is the reason why checkpoint fails? Would it be possible for
>> you to prepare some reproducible example for that problem? Finally, I would
>> also recommend trying out Flink 1.6.x, as we reworked the underlying
>> structure for CEP - SharedBuffer.
>>
>> Best,
>>
>> Dawid
>> On 30/10/2018 20:59, Austin Cawley-Edwards wrote:
>>
>> Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.
>>
>> Thanks,
>> Austin
>>
>> On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <
>> 

Live configuration change

2018-11-06 Thread Ning Shi
In the job I'm implementing, there are a couple of configuration
variables that I wnat to change at runtime, such as rate limit at the
Kafka source. I know it's possible to use a control stream and join it
with the normal stream to configure things in certain operators, but
this doesn't work for the source. Is there any other way to configure
settings at runtime?

Thanks,

--
Ning


Re: Report failed job submission

2018-11-06 Thread Flavio Pompermaier
Any idea about how to address this issue?
On Tue, Oct 16, 2018 at 11:32 AM Flavio Pompermaier 
wrote:

> Hi to all,
> which is the correct wat to report back to the user a failure from a job
> submission in FLink?
> If everything is OK the job run API returns the job id but what if there
> are error in parameter validation or some other problem?
> Which is the right way to report back to the user the job error detail
> (apart from throwing an Exception)?
>
> Best,
> Flavio
>


Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

2018-11-06 Thread LINZ, Arnaud
Hello,



In flink 1.3, I was able to make a clean stop of a HA streaming application 
just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.



Even if there are no sources and no item to process, the cluster continue its 
execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 
3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.



Why has this behavior changed? How am I supposed to stop a streaming execution 
from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of 
any use?



Thanks,

Arnaud





L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Split one dataset into multiple

2018-11-06 Thread Fabian Hueske
You have to define a common type, like an n-ary Either type and return that
from your source / operator.
The resulting DataSet can be consumed by multiple FlatmapFunctions, each
extracting and forwarding one of the the result types.

Cheers, Fabian

Am Di., 6. Nov. 2018 um 10:40 Uhr schrieb madan :

> Hi Vino,
>
> Thank you for suggestions. In my case I am using DataSet since data is
> limited, and split/select is not available on DataSet api.
> I doubt even hash partition might not work for me. By doing hash
> partition, I do not know which partition is having which entity data (Dept,
> Emp in my example. And sometimes hasing might be same for 2 different
> entities). And on that partition I need to apply some other
> transformations(based on partition data) which is not possible using
> MapPartitionFunction.
>
> Please suggest if my understanding is wrong and usecase is achievable
> (little example is of great help).
>
> Thank you,
> Madan
>
> On Tue, Nov 6, 2018 at 12:03 PM vino yang  wrote:
>
>> Hi madan,
>>
>> I think you need to hash partition your records.
>> Flink supports hash partitioning of data.
>> The operator is keyBy.
>> If the value of your tag field is enumerable, you can also use
>> split/select to achieve your purpose.
>>
>> Thanks, vino.
>>
>> madan  于2018年11月5日周一 下午6:37写道:
>>
>>> Hi,
>>>
>>> I have a custom iterator which gives data of multitple entities. For
>>> example iterator gives data of Department, Employee and Address. Record's
>>> entity type is identified by a field value. And I need to apply different
>>> set of operations on each dataset. Ex., Department data may have
>>> aggregations, Employee and Address data are simply joined together after
>>> some filteration.
>>>
>>> If I have different datasets for each entity type the job is easy. So I
>>> am trying to split incoming data to different datasets. What is the best
>>> possible way to achieve this ?
>>>
>>> *Iterator can be read only once.
>>>
>>>
>>> --
>>> Thank you,
>>> Madan.
>>>
>>
>
> --
> Thank you,
> Madan.
>


Re: Split one dataset into multiple

2018-11-06 Thread madan
Hi Vino,

Thank you for suggestions. In my case I am using DataSet since data is
limited, and split/select is not available on DataSet api.
I doubt even hash partition might not work for me. By doing hash partition,
I do not know which partition is having which entity data (Dept, Emp in my
example. And sometimes hasing might be same for 2 different entities). And
on that partition I need to apply some other transformations(based on
partition data) which is not possible using MapPartitionFunction.

Please suggest if my understanding is wrong and usecase is achievable
(little example is of great help).

Thank you,
Madan

On Tue, Nov 6, 2018 at 12:03 PM vino yang  wrote:

> Hi madan,
>
> I think you need to hash partition your records.
> Flink supports hash partitioning of data.
> The operator is keyBy.
> If the value of your tag field is enumerable, you can also use
> split/select to achieve your purpose.
>
> Thanks, vino.
>
> madan  于2018年11月5日周一 下午6:37写道:
>
>> Hi,
>>
>> I have a custom iterator which gives data of multitple entities. For
>> example iterator gives data of Department, Employee and Address. Record's
>> entity type is identified by a field value. And I need to apply different
>> set of operations on each dataset. Ex., Department data may have
>> aggregations, Employee and Address data are simply joined together after
>> some filteration.
>>
>> If I have different datasets for each entity type the job is easy. So I
>> am trying to split incoming data to different datasets. What is the best
>> possible way to achieve this ?
>>
>> *Iterator can be read only once.
>>
>>
>> --
>> Thank you,
>> Madan.
>>
>

-- 
Thank you,
Madan.