Re: State Maintenance

2017-09-08 Thread Fabian Hueske
Only KeyedState can be used as queryable state. So you cannot query the
OperatorState.
AFAIK, it should not be a problem if an operator has OperatorState and
queryable KeyedState.

2017-09-07 17:01 GMT+02:00 Navneeth Krishnan :

> Will I be able to use both queryable MapState and union list state while
> implementing the CheckpointedFunction interface? Because one of my major
> requirement on that operator is to provide a queryable state and in order
> to compute that state we need the common static state across all parallel
> operator instances.
>
> Thanks.
>
> On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske  wrote:
>
>> Hi Navneeth,
>>
>> there's a lower level state interface that should address your
>> requirements: OperatorStateStore.getUnionListState()
>>
>> This union list state is similar to the regular operator list state, but
>> instead of splitting the list for recovery and giving out splits to
>> operator instance, it restores the complete list on each operator instance.
>> So it basically does a broadcast restore. If all operator have the same
>> state, only one instance checkpoints its state and this state is restored
>> to all other instances in case of a failure. This should also work with
>> rescaling.
>> The operator instance to checkpoint can be identified by
>> (RuntimeContext.getIndexOfThisSubtask == 0).
>>
>> The OperatorStateStore is a bit hidden. You have to implement the
>> CheckpointedFunction interface. When CheckpointedFunction.initializ
>> eState(FunctionInitializationContext context) is called context has a
>> method getOperatorStateStore().
>>
>> I'd recommend to have a look at the detailed JavaDocs of all involved
>> classes and methods.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan :
>>
>>> Thanks Gordon for your response. I have around 80 parallel flatmap
>>> operator instances and each instance requires 3 states. Out of which one is
>>> user state in which each operator will have unique user's data and I need
>>> this data to be queryable. The other two states are kind of static states
>>> which are only modified when there an update message in config stream. This
>>> static data could easily be around 2GB and in my previous approach I used
>>> operator state where the data is retrieved inside open method across all
>>> operator instances whereas checkpointed only inside one of the operator
>>> instance.
>>>
>>> One of the issue that I have is if I change the operator parallelism how
>>> would it affect the internal state?
>>>
>>>
>>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>>
 Hi Navneeth,

 Answering your three questions separately:

 1. Yes. Your MapState will be backed by RocksDB, so when removing an
 entry
 from the map state, the state will be removed from the local RocksDB as
 well.

 2. If state classes are not POJOs, they will be serialized by Kryo,
 unless a
 custom serializer is specifically specified otherwise. You can take a
 look
 at this document on how to do that [1].

 3. I might need to know more information to be able to suggest properly
 for
 this one. How are you using the "huge state values"? From what you
 described, it seems like you only need it on one of the parallel
 instances,
 so I'm a bit curious on what they are actually used for. Are they needed
 when processing your records?

 Cheers,
 Gordon

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/stream/state.html#custom-serialization-for-managed-state



 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/

>>>
>>>
>>
>


Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread Fabian Hueske
Hi Kant,

no, there is no such functionality.
I'm also not sure how well streaming would work together with the JDBC
interface. JDBC has not been designed for continuous streaming queries,
i.e., queries that never terminate.
Challenges would be to have an infinite, streamable ResultSet (which might
be possible) and how to represent retractions, i.e., updates of previously
emitted results (I doubt this would work).

If a streamable ResultSet was possible, a subset of queries (those that
only produce new rows and never have to update emitted results) could be
supported.

Right now, the approach would be to implement a client program that
executes queries and writes their result to a destination like Kafka or a
database using a TableSink [1].
The community is also discussing a SQL client to submits queries. [2]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html
[2] https://issues.apache.org/jira/browse/FLINK-7594


2017-09-07 21:43 GMT+02:00 kant kodali :

> Hi All,
>
> Does Flink has a JDBC server where I can submit Calcite Streaming Queries?
> such that I get Stream of responses back from Flink forever via JDBC ? What
> is the standard way to do this?
>
> Thanks,
> Kant
>


Re: File System State Backend

2017-09-08 Thread Stefan Richter
Hi,

I just tried out checkpoint with FsStateBackend in 1.3.2 and everything works 
as expected for me. Can you give a bit more detail what you mean by „checkpoint 
data is not cleaning“? For example, is it not cleaned up while the job is 
running and accumulating „chk-[ID]“ directories or is something left over 
multiple restarts? Which filesystem are you using for the checkpoints, e.g. 
local, HDFS, S3,… ? Does this also happen for other jobs?

Best,
Stefan

> Am 08.09.2017 um 03:17 schrieb rnosworthy :
> 
> Flink 1.3.2
> 
> I have 1 vm for the job manager and another for task manager.
> 
> I have a custom windowing trigger shown below.
> 
> My checkpoint data is not clearing.
> 
> I have tried to inject a fileStateThresholdSize when instantiating the
> FsStateBackend object, but that didn't work.
> 
> I have tried explicitly setting state.checkpoints.num-retained: 1 in the
> flink.yaml file but that didn't work either.
> 
> Not sure what else to try, can someone suggest anything.
> 
> Thanks in advance.
> 
> Ryan
> 
> ==
> 
> 
> /**
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers
> *
> */
> public class ThresholdTrigger extends Trigger {
> 
>  private static final Logger LOG =
> LoggerFactory.getLogger(ThresholdTrigger.class);
>  private static final long serialVersionUID = 1L;
>  private static final SimpleDateFormat sdf = new SimpleDateFormat("dd
> HH:mm:ss a");
> 
>  private final ValueStateDescriptor maxCountDesc =
>  new ValueStateDescriptor<>(
>  "max",
>  TypeInformation.of(new TypeHint() {}));
> 
>  private final ReducingStateDescriptor currentCountDesc =
>  new ReducingStateDescriptor<>(
>  "count",
>  new Sum(),
>  IntSerializer.INSTANCE);
> 
>  @Override
>  public TriggerResult onElement(MonitorProbe probe, long timestamp,
> TimeWindow window, TriggerContext ctx)
>  throws Exception {
> 
>if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
>  // if the watermark is already past the window fire immediately
>  clear(window, ctx);
>  return TriggerResult.FIRE_AND_PURGE;
>}
> 
>ValueState maxCount = ctx.getPartitionedState(maxCountDesc);
>ReducingState currentCount =
> ctx.getPartitionedState(currentCountDesc);
>currentCount.add(1);
> 
>if (maxCount.value() == null) {
>  maxCount.update(probe.getThresholdConfig().getSampleSize());
>}
> 
>LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]",
>probe.getLoggingKey(),
>window.getStart(), window.getEnd(),
>sdf.format(new Date(window.getStart())),
>sdf.format(new Date(window.getEnd())),
>currentCount.get(), maxCount.value());
> 
>if (currentCount.get().equals(maxCount.value())){
>  clear(window, ctx);
>  return TriggerResult.FIRE_AND_PURGE;
>}else{
>  ctx.registerEventTimeTimer(window.maxTimestamp());
>  return TriggerResult.CONTINUE;
>}
> 
>  }
> 
>  @Override
>  public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx)
>  throws Exception {
>throw new UnsupportedOperationException("This is not processing time
> trigger");
>  }
> 
>  @Override
>  public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
> 
>ReducingState currentCount =
> ctx.getPartitionedState(currentCountDesc);
>ValueState maxCount = ctx.getPartitionedState(maxCountDesc);
> 
>if (currentCount.get().equals(maxCount.value())){
>  clear(window, ctx);
>  return TriggerResult.FIRE_AND_PURGE;
>}else{
>  clear(window, ctx);
>  return TriggerResult.PURGE;
>}
>  }
> 
>  @Override
>  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
> {
>ctx.getPartitionedState(currentCountDesc).clear();
>ctx.getPartitionedState(maxCountDesc).clear();
>  }
> 
>  @Override
>  public String toString() {
>return "ThresholdTrigger(" + maxCountDesc + ")";
>  }
> 
>  private static class Sum implements ReduceFunction {
> 
>private static final long serialVersionUID = 1L;
> 
>@Override
>public Integer reduce(Integer value1, Integer value2) throws Exception {
>  return value1 + value2;
>}
> 
>  }
> }
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread kant kodali
Hi Fabian,

Thanks for the response. I understand the common approach is to write a
client program and run it however this will not allow me to send queries
Ad-hoc so Is there anyway for me to submit Calcite SQL to Flink via REST or
whatever mechanism? Forgot even, the result set once I know there is a way
to submit Adhoc queries I can figure out a way to write to Kafka.

Thanks,
kant

On Fri, Sep 8, 2017 at 12:52 AM, Fabian Hueske  wrote:

> Hi Kant,
>
> no, there is no such functionality.
> I'm also not sure how well streaming would work together with the JDBC
> interface. JDBC has not been designed for continuous streaming queries,
> i.e., queries that never terminate.
> Challenges would be to have an infinite, streamable ResultSet (which might
> be possible) and how to represent retractions, i.e., updates of previously
> emitted results (I doubt this would work).
>
> If a streamable ResultSet was possible, a subset of queries (those that
> only produce new rows and never have to update emitted results) could be
> supported.
>
> Right now, the approach would be to implement a client program that
> executes queries and writes their result to a destination like Kafka or a
> database using a TableSink [1].
> The community is also discussing a SQL client to submits queries. [2]
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/
> sourceSinks.html
> [2] https://issues.apache.org/jira/browse/FLINK-7594
>
>
> 2017-09-07 21:43 GMT+02:00 kant kodali :
>
>> Hi All,
>>
>> Does Flink has a JDBC server where I can submit Calcite Streaming
>> Queries? such that I get Stream of responses back from Flink forever via
>> JDBC ? What is the standard way to do this?
>>
>> Thanks,
>> Kant
>>
>
>


Quick checkpointing related question

2017-09-08 Thread Martin Eden
Hi all,

I have a Flink 1.3.1 job with a source that implements
CheckpointingFunction.

As I understand it, the notifyCheckpointComplete callback is called when
all the downstream operators in the DAG successfully finished their
checkpoints.

Since I am doing some work in this method, I would like to know if the
latency of the execution of this method is reflected in any of the
checkpointing stats of the source operator? If yes which one? End To End
Duration / Checkpoint Duration sync or async / Alignment duration?

Thanks,
M


Re: Quick checkpointing related question

2017-09-08 Thread Stefan Richter
Hi,

the method is only called after the checkpoint completed on the job manager. At 
this point _all_ work for the checkpoint is done, so doing work in this 
callback does not add any overhead to the checkpoint.

Best,
Stefan

> Am 08.09.2017 um 10:20 schrieb Martin Eden :
> 
> Hi all,
> 
> I have a Flink 1.3.1 job with a source that implements CheckpointingFunction.
> 
> As I understand it, the notifyCheckpointComplete callback is called when all 
> the downstream operators in the DAG successfully finished their checkpoints.
> 
> Since I am doing some work in this method, I would like to know if the 
> latency of the execution of this method is reflected in any of the 
> checkpointing stats of the source operator? If yes which one? End To End 
> Duration / Checkpoint Duration sync or async / Alignment duration?
> 
> Thanks,
> M
> 
> 



Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread Fabian Hueske
As I said, there is no such functionality built into Flink yet.

A client program can be parameterized with a query and turned into a SQL
client that way.
The submission would work with the regular Flink job client, i.e., it would
pickup the regular Flink config.

Best, Fabian

2017-09-08 10:05 GMT+02:00 kant kodali :

> Hi Fabian,
>
> Thanks for the response. I understand the common approach is to write a
> client program and run it however this will not allow me to send queries
> Ad-hoc so Is there anyway for me to submit Calcite SQL to Flink via REST or
> whatever mechanism? Forgot even, the result set once I know there is a way
> to submit Adhoc queries I can figure out a way to write to Kafka.
>
> Thanks,
> kant
>
> On Fri, Sep 8, 2017 at 12:52 AM, Fabian Hueske  wrote:
>
>> Hi Kant,
>>
>> no, there is no such functionality.
>> I'm also not sure how well streaming would work together with the JDBC
>> interface. JDBC has not been designed for continuous streaming queries,
>> i.e., queries that never terminate.
>> Challenges would be to have an infinite, streamable ResultSet (which
>> might be possible) and how to represent retractions, i.e., updates of
>> previously emitted results (I doubt this would work).
>>
>> If a streamable ResultSet was possible, a subset of queries (those that
>> only produce new rows and never have to update emitted results) could be
>> supported.
>>
>> Right now, the approach would be to implement a client program that
>> executes queries and writes their result to a destination like Kafka or a
>> database using a TableSink [1].
>> The community is also discussing a SQL client to submits queries. [2]
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/table/sourceSinks.html
>> [2] https://issues.apache.org/jira/browse/FLINK-7594
>>
>>
>> 2017-09-07 21:43 GMT+02:00 kant kodali :
>>
>>> Hi All,
>>>
>>> Does Flink has a JDBC server where I can submit Calcite Streaming
>>> Queries? such that I get Stream of responses back from Flink forever via
>>> JDBC ? What is the standard way to do this?
>>>
>>> Thanks,
>>> Kant
>>>
>>
>>
>


Re: Quick checkpointing related question

2017-09-08 Thread Martin Eden
Thanks for the prompt reply Stefan!

On Fri, Sep 8, 2017 at 9:25 AM, Stefan Richter 
wrote:

> Hi,
>
> the method is only called after the checkpoint completed on the job
> manager. At this point _all_ work for the checkpoint is done, so doing work
> in this callback does not add any overhead to the checkpoint.
>
> Best,
> Stefan
>
> > Am 08.09.2017 um 10:20 schrieb Martin Eden :
> >
> > Hi all,
> >
> > I have a Flink 1.3.1 job with a source that implements
> CheckpointingFunction.
> >
> > As I understand it, the notifyCheckpointComplete callback is called when
> all the downstream operators in the DAG successfully finished their
> checkpoints.
> >
> > Since I am doing some work in this method, I would like to know if the
> latency of the execution of this method is reflected in any of the
> checkpointing stats of the source operator? If yes which one? End To End
> Duration / Checkpoint Duration sync or async / Alignment duration?
> >
> > Thanks,
> > M
> >
> >
>
>


Re: Does Flink has a JDBC server where I can submit Calcite Streaming Queries?

2017-09-08 Thread kant kodali
Got it! Thanks a lot for your detailed explanation.

On Fri, Sep 8, 2017 at 1:27 AM, Fabian Hueske  wrote:

> As I said, there is no such functionality built into Flink yet.
>
> A client program can be parameterized with a query and turned into a SQL
> client that way.
> The submission would work with the regular Flink job client, i.e., it
> would pickup the regular Flink config.
>
> Best, Fabian
>
> 2017-09-08 10:05 GMT+02:00 kant kodali :
>
>> Hi Fabian,
>>
>> Thanks for the response. I understand the common approach is to write a
>> client program and run it however this will not allow me to send queries
>> Ad-hoc so Is there anyway for me to submit Calcite SQL to Flink via REST or
>> whatever mechanism? Forgot even, the result set once I know there is a way
>> to submit Adhoc queries I can figure out a way to write to Kafka.
>>
>> Thanks,
>> kant
>>
>> On Fri, Sep 8, 2017 at 12:52 AM, Fabian Hueske  wrote:
>>
>>> Hi Kant,
>>>
>>> no, there is no such functionality.
>>> I'm also not sure how well streaming would work together with the JDBC
>>> interface. JDBC has not been designed for continuous streaming queries,
>>> i.e., queries that never terminate.
>>> Challenges would be to have an infinite, streamable ResultSet (which
>>> might be possible) and how to represent retractions, i.e., updates of
>>> previously emitted results (I doubt this would work).
>>>
>>> If a streamable ResultSet was possible, a subset of queries (those that
>>> only produce new rows and never have to update emitted results) could be
>>> supported.
>>>
>>> Right now, the approach would be to implement a client program that
>>> executes queries and writes their result to a destination like Kafka or a
>>> database using a TableSink [1].
>>> The community is also discussing a SQL client to submits queries. [2]
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/table/sourceSinks.html
>>> [2] https://issues.apache.org/jira/browse/FLINK-7594
>>>
>>>
>>> 2017-09-07 21:43 GMT+02:00 kant kodali :
>>>
 Hi All,

 Does Flink has a JDBC server where I can submit Calcite Streaming
 Queries? such that I get Stream of responses back from Flink forever via
 JDBC ? What is the standard way to do this?

 Thanks,
 Kant

>>>
>>>
>>
>


Re: Flink Job Deployment

2017-09-08 Thread Fabian Hueske
Hi Rinat,

no, this is unfortunately not possible.
When a job is submitted, all required JARs are copied into an HDFS location
that's job-specific.

Best, Fabian

2017-09-04 13:11 GMT+02:00 Rinat :

> Hi folks !
> I’ve got a question about running flink job on the top of YARN.
> Is there any possibility to store job sources in hdfs, for example
>
> /app/flink/job-name/
>   - /lib/*.jar
>   - /etc/*.properties
>
> and specify directories, that should be added to the job classpath ?
>
> Thx.
>
>
>
>


Re: Flink Job Deployment (Not enough resources)

2017-09-08 Thread Fabian Hueske
Hi Rinat,

No, Flink does not have a switch to immediately cancel a job if it cannot
allocate enough resources.
Maybe YARN has a configuration parameter to define a timeout after which a
job is canceled if no resource become available.

2017-09-04 13:29 GMT+02:00 Rinat :

> Hi everyone, I’ve got the following problem, when I’m trying to submit new
> job and if cluster has not enough resources, job submission fails with the
> following exception
> But in *YARN *job hangs and wait’s for requested resources. When
> resources become available, job successfully runs.
>
> What can I do to be sure that job startup is completed successfully or
> completely failed  ?
>
> Thx.
>
> *The program finished with the following
> exception:\n\njava.lang.RuntimeException: Unable to tell application master
> to stop once the specified job has been finised*\n\tat
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(
> YarnClusterClient.java:177)\n\tat org.apache.flink.yarn.
> YarnClusterClient.submitJob(YarnClusterClient.java:201)\n\tat
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)\n\tat
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(
> DetachedEnvironment.java:76)\n\tat org.apache.flink.client.
> program.ClusterClient.run(ClusterClient.java:387)\n\tat
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)\n\tat
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)\n\tat
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)\n\tat
> org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)\n\tat
> org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)\n\tat
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)\n\tat java.security.
> AccessController.doPrivileged(Native Method)\n\tat
> javax.security.auth.Subject.doAs(Subject.java:422)\n\tat
> org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1656)\n\tat org.apache.flink.runtime.security.
> HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)\n\tat
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129)\nCaused
> by: org.apache.flink.util.FlinkException: Could not connect to the
> leading JobManager. Please check that the JobManager is running.\n\tat
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789)\n\tat
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(
> YarnClusterClient.java:171)\n\t... 15 more\nCaused by:
> org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could
> not retrieve the leader gateway.\n\tat org.apache.flink.runtime.util.
> LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)\n\tat
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784)\n\t...
> 16 more\nCaused by: java.util.concurrent.TimeoutException: Futures timed
> out after [1 milliseconds]\n\tat scala.concurrent.impl.Promise$
> DefaultPromise.ready(Promise.scala:219)\n\tat
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)\n\tat
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)\n\tat
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)\n\tat
> scala.concurrent.Await$.result(package.scala:190)\n\tat
> scala.concurrent.Await.result(package.scala)\n\tat
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(
> LeaderRetrievalUtils.java:77)\n\t... 17 more", "stderr_lines": ["",
> "", " The
> program finished with the following exception:", "",
> "java.lang.RuntimeException: Unable to tell application master to stop once
> the specified job has been finised", "\tat org.apache.flink.yarn.
> YarnClusterClient.stopAfterJob(YarnClusterClient.java:177)", "\tat
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:201)",
> "\tat 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)",
> "\tat 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)",
> "\tat 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)",
> "\tat 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)",
> "\tat org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)",
> "\tat 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)",
> "\tat org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)",
> "\tat org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)",
> "\tat org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)", "\tat java.security.
> AccessController.doPrivileged(Native Method)", "\tat
> javax.security.auth.Subject.doAs(Subject.java:422)", "\tat
> org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformatio

Re: Handle event time

2017-09-08 Thread AndreaKinn
Thank you, effectively I developed also a simple custom solution for
watermark looking at flink doc but anyway I see unordered printed streams.
I have a doubt about flink behaviour: if I understand, flink doesn't perform
automatically reordering of records in a stream, so if for instance a record
arrives in late what is the behaviour of flink? In the doc it's described
that elements arrive after in late are dropped (allowed lateness default
value is 0) but also using this watermark emitter:

*public class CustomTimestampExtractor implements
AssignerWithPeriodicWatermarks>{

private static final long serialVersionUID = 5448621759931440489L;
private final long maxOutOfOrderness = 0;
private long currentMaxTimestamp;

@Override
public long extractTimestamp(Tuple6 element, long previousElementTimestamp) {
long timestamp = element.f2.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}*

with maxOutOfOrderness = 0 I see unordered record in the stream.

What I want to obtain is a fully ordered stream, is there a way to implement
it?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Apache Phenix integration

2017-09-08 Thread Flavio Pompermaier
I opened an issue for this: https://issues.apache.org/jira/browse/FLINK-7605

On Wed, Sep 6, 2017 at 4:24 PM, Flavio Pompermaier 
wrote:

> Maybe this should be well documented also...is there any dedicated page to
> Flink and JDBC connectors?
>
> On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske  wrote:
>
>> Great!
>>
>> If you want to, you can open a PR that adds
>>
>> if (!conn.getAutoCommit()) {
>>   conn.setAutoCommit(true);
>> }
>>
>> to JdbcOutputFormat.open().
>>
>> Cheers, Fabian
>>
>>
>>
>> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi Fabian,
>>> thanks for the detailed answer. Obviously you are right :)
>>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>>> disabled by default in Phoenix, but it can be easily enabled just appending
>>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>>> property in the conf object passed to the Phoenix
>>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>>> i.e.:
>>>
>>> --
>>> Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>> --
>>>
>>> Now my job works also with the standard Flink JDBCOutputformat.
>>> Just to help other people willing to play with Phoenix and HBase I paste
>>> below my simple test job:
>>>
>>> @Test
>>>   public void testPhoenixOutputFormat() throws Exception {
>>>
>>> final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>> senv.enableCheckpointing(5000);
>>> DataStream testStream = senv.fromElements("1,aaa,XXX",
>>> "2,bbb,YYY", "3,ccc,ZZZ");
>>>
>>> // Set the target Phoenix table and the columns
>>> DataStream rows = testStream.map(new MapFunction()
>>> {
>>>
>>>   private static final long serialVersionUID = 1L;
>>>
>>>   @Override
>>>   public Row map(String str) throws Exception {
>>> String[] split = str.split(Pattern.quote(","));
>>> Row ret = new Row(3);
>>> ret.setField(0, split[0]);
>>> ret.setField(1, split[1]);
>>> ret.setField(2, split[2]);
>>> return ret;
>>>   }
>>> }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.
>>> STRING_TYPE_INFO));
>>>
>>> Job job = Job.getInstance(HBaseConfiguration.create(),
>>> "phoenix-mr-job");
>>> PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>>> "FIELD_1,FIELD2,FIELD_3");
>>> final org.apache.hadoop.conf.Configuration jobConf =
>>> job.getConfiguration();
>>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>> final String upsertStatement = PhoenixConfigurationUtil.getUp
>>> sertStatement(jobConf);
>>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>>> Properties(), jobConf);
>>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>>
>>> rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>> .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.g
>>> etCanonicalName())
>>> .setDBUrl(connUrl)
>>> .setQuery(upsertStatement)
>>> .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>>> Types.VARCHAR})
>>> .finish());
>>>
>>> senv.execute();
>>>   }
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske  wrote:
>>>
 Hi,

 According to the JavaDocs of java.sql.Connection, commit() will throw
 an exception if the connection is in auto commit mode which should be the
 default.
 So adding this change to the JdbcOutputFormat seems a bit risky.

 Maybe the Phoenix JDBC connector does not enable auto commits by
 default (or doesn't support it). Can you check that Flavio?
 If the Phoenix connector supports but not activates auto commits by
 default, we can enable it in JdbcOutputFormat.open().
 If auto commits are not supported, we can add a check after execute()
 and call commit() only if Connection.getAutoCommit() returns false.

 Best, Fabian


 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> I'm writing a job that uses Apache Phoenix.
>
> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
> it's not well suited to work with Table API because it cannot handle
> generic objects like Rows (it need a DBWritable Object that should be
> already present at compile time). So I've looked into the code of the
> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
> (basically).
>
> However, to make it work I had to slightly modify the Flink
> JD

BucketingSink never closed

2017-09-08 Thread Flavio Pompermaier
Hi to all,
I'm trying to test a streaming job but the files written by
the BucketingSink are never finalized (remains into the pending state).
Is this caused by the fact that the job finishes before the checkpoint?
Shouldn't the sink properly close anyway?

This is my code:

  @Test
  public void testBucketingSink() throws Exception {
final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(senv);
senv.enableCheckpointing(5000);
DataStream testStream = senv.fromElements(//
"1,aaa,white", //
"2,bbb,gray", //
"3,ccc,white", //
"4,bbb,gray", //
"5,bbb,gray" //
);
final RowTypeInfo rtf = new RowTypeInfo(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
DataStream rows = testStream.map(new MapFunction() {

  private static final long serialVersionUID = 1L;

  @Override
  public Row map(String str) throws Exception {
String[] split = str.split(Pattern.quote(","));
Row ret = new Row(3);
ret.setField(0, split[0]);
ret.setField(1, split[1]);
ret.setField(2, split[2]);
return ret;
  }
}).returns(rtf);

String columnNames = "id,value,state";
final String dsName = "test";
tEnv.registerDataStream(dsName, rows, columnNames);
final String whiteAreaFilter = "state = 'white'";
DataStream grayArea = rows;
DataStream whiteArea = null;
if (whiteAreaFilter != null) {
  String sql = "SELECT *, (%s) as _WHITE FROM %s";
  sql = String.format(sql, whiteAreaFilter, dsName);
  Table table = tEnv.sql(sql);
  grayArea =
tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
  DataStream nw =
tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
  whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
}
Writer bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");

String datasetWhiteDir = "/tmp/bucket/white";
BucketingSink whiteAreaSink = new
BucketingSink<>(datasetWhiteDir.toString());
whiteAreaSink.setWriter(bucketSinkwriter);
whiteAreaSink.setBatchSize(10);
whiteArea.addSink(whiteAreaSink);

String datasetGrayDir = "/tmp/bucket/gray";
BucketingSink grayAreaSink = new
BucketingSink<>(datasetGrayDir.toString());
grayAreaSink.setWriter(bucketSinkwriter);
grayAreaSink.setBatchSize(10);
grayArea.addSink(grayAreaSink);

JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
System.out.printf("Job took %s minutes",
jobInfo.getNetRuntime(TimeUnit.MINUTES));
  }







public class RowCsvWriter extends StreamWriterBase {
  private static final long serialVersionUID = 1L;

  private final String charsetName;
  private transient Charset charset;
  private String fieldDelimiter;
  private String recordDelimiter;
  private boolean allowNullValues = true;
  private boolean quoteStrings = false;

  /**
   * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset
to convert strings to
   * bytes.
   */
  public RowCsvWriter() {
this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER,
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
  }

  /**
   * Creates a new {@code StringWriter} that uses the given charset to
convert strings to bytes.
   *
   * @param charsetName Name of the charset to be used, must be valid input
for
   *{@code Charset.forName(charsetName)}
   */
  public RowCsvWriter(String charsetName, String fieldDelimiter, String
recordDelimiter) {
this.charsetName = charsetName;
this.fieldDelimiter = fieldDelimiter;
this.recordDelimiter = recordDelimiter;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);

try {
  this.charset = Charset.forName(charsetName);
} catch (IllegalCharsetNameException ex) {
  throw new IOException("The charset " + charsetName + " is not
valid.", ex);
} catch (UnsupportedCharsetException ex) {
  throw new IOException("The charset " + charsetName + " is not
supported.", ex);
}
  }

  @Override
  public void write(Row element) throws IOException {
FSDataOutputStream outputStream = getStream();
writeRow(element, outputStream);
  }

  private void writeRow(Row element, FSDataOutputStream out) throws
IOException {
int numFields = element.getArity();

for (int i = 0; i < numFields; i++) {
  Object obj = element.getField(i);
  if (obj != null) {
if (i != 0) {
  out.write(this.fieldDelimiter.getBytes(charset));
}

if (quoteStrings) {
  if (obj instanceof String || obj instanceof StringValue) {
out.write('"');
out.write(obj.toString().getBytes(charset));
out.write('"');
  } else {
out.write(obj.toString().getBytes(charset));

Assigning operators to slots

2017-09-08 Thread AndreaKinn
Hi, firstly excuse me for the long post.
I already read the documentation about parallelism, slots and the API about
it but I still have some doubts about practical implementations of them.
My program is composed essentially by three operations:

- get data from a kafka source
- perform a machine learning operator on the retrieved stream
- push out data to a cassandra sink

I'd like to investigate and trying implement them in two different
situations:


1) FIRST ONE

Imagine I have a single dual core physical node and suppose I set
NumberOfTaskSlot = NumberOfCore (As suggested by the doc). 


 

I suppose I can divide in a fixed way the operations into slots as described
in the figure. Is this possible?
Can I do that using slotSharingGroup(groupname) method ? Or have I to use
startNewChain() between the operator?
Example:

*DataStream stream = env
.addSource(new FlinkKafkaConsumer010<>(TOPIC, 
new CustomDeserializer(),
properties))
.assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
.map(...)
.slotSharingGroup("source");*

or

*DataStream stream = env
.addSource(new FlinkKafkaConsumer010<>(TOPIC, 
new CustomDeserializer(),
properties))
.startNewChain()
.assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
.startNewChain()
.map(...);
*


2) SECOND ONE

Imagine I have 3 dual core physical nodes.


 

I suppose I can reserve one physical NODE for each operation. Is this
possible?
In this case honestly I don't know how to implement that at level code.
Moreover, I don't know if it would has sense set NumberTaskSlot =
NumberOfCores or to leave this option to Flink's choice.








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to user ParameterTool.fromPropertiesFile() to get resource file inside my jar

2017-09-08 Thread Tony Wei
Hi,

I put the my configuration file in `./src/main/resources/` and packed it
inside my jar.
I want to run it on standalone cluster by using web UI to submit my job.
No matter which way I tried, the ParameterTool.fromPropertiesFile()
couldn't find the file path, but threw `FileNotFoundException` instead.
Is there any best practice to deal with such problem? Thanks for your help.

Best Regards,
Tony Wei


Re: BucketingSink never closed

2017-09-08 Thread Kostas Kloudas
Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646 


There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
 


Kostas

> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier  wrote:
> 
> Hi to all,
> I'm trying to test a streaming job but the files written by the BucketingSink 
> are never finalized (remains into the pending state).
> Is this caused by the fact that the job finishes before the checkpoint?
> Shouldn't the sink properly close anyway?
> 
> This is my code:
> 
>   @Test
>   public void testBucketingSink() throws Exception {
> final StreamExecutionEnvironment senv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final StreamTableEnvironment tEnv = 
> TableEnvironment.getTableEnvironment(senv);
> senv.enableCheckpointing(5000);
> DataStream testStream = senv.fromElements(//
> "1,aaa,white", //
> "2,bbb,gray", //
> "3,ccc,white", //
> "4,bbb,gray", //
> "5,bbb,gray" //
> );
> final RowTypeInfo rtf = new RowTypeInfo(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO);
> DataStream rows = testStream.map(new MapFunction() {
> 
>   private static final long serialVersionUID = 1L;
> 
>   @Override
>   public Row map(String str) throws Exception {
> String[] split = str.split(Pattern.quote(","));
> Row ret = new Row(3);
> ret.setField(0, split[0]);
> ret.setField(1, split[1]);
> ret.setField(2, split[2]);
> return ret;
>   }
> }).returns(rtf);
> 
> String columnNames = "id,value,state";
> final String dsName = "test";
> tEnv.registerDataStream(dsName, rows, columnNames);
> final String whiteAreaFilter = "state = 'white'";
> DataStream grayArea = rows;
> DataStream whiteArea = null;
> if (whiteAreaFilter != null) {
>   String sql = "SELECT *, (%s) as _WHITE FROM %s";
>   sql = String.format(sql, whiteAreaFilter, dsName);
>   Table table = tEnv.sql(sql);
>   grayArea = 
> tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>   DataStream nw = 
> tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
>   whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
> }
> Writer bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
> 
> String datasetWhiteDir = "/tmp/bucket/white";
> BucketingSink whiteAreaSink = new 
> BucketingSink<>(datasetWhiteDir.toString());
> whiteAreaSink.setWriter(bucketSinkwriter);
> whiteAreaSink.setBatchSize(10);
> whiteArea.addSink(whiteAreaSink);
> 
> String datasetGrayDir = "/tmp/bucket/gray";
> BucketingSink grayAreaSink = new 
> BucketingSink<>(datasetGrayDir.toString());
> grayAreaSink.setWriter(bucketSinkwriter);
> grayAreaSink.setBatchSize(10);
> grayArea.addSink(grayAreaSink);
> 
> JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
> System.out.printf("Job took %s minutes", 
> jobInfo.getNetRuntime(TimeUnit.MINUTES));
>   }
> 
> 
> 
> 
> 
> 
> 
> public class RowCsvWriter extends StreamWriterBase {
>   private static final long serialVersionUID = 1L;
> 
>   private final String charsetName;
>   private transient Charset charset;
>   private String fieldDelimiter;
>   private String recordDelimiter;
>   private boolean allowNullValues = true;
>   private boolean quoteStrings = false;
> 
>   /**
>* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to 
> convert strings to
>* bytes.
>*/
>   public RowCsvWriter() {
> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, 
> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>   }
> 
>   /**
>* Creates a new {@code StringWriter} that uses the given charset to 
> convert strings to bytes.
>*
>* @param charsetName Name of the charset to be used, must be valid input 
> for
>*{@code Charset.forName(charsetName)}
>*/
>   public RowCsvWriter(String charsetName, String fieldDelimiter, String 
> recordDelimiter) {
> this.charsetName = charsetName;
> this.fieldDelimiter = fieldDelimiter;
> this.recordDelimiter = recordDelimiter;
>   }
> 
>   @Override
>   public void open(FileSystem fs, Path path) throws IOException {
> super.open(fs, path);
> 
> try {
>   this.charset = Charset.forName(charsetName);
> } catch (IllegalCharsetNameException ex) {
>   throw new IOException("The charset " + charsetName + " is not valid.", 
> ex);
> } catch (UnsupportedCharsetException ex) {
>   throw ne

Re: Assigning operators to slots

2017-09-08 Thread Aljoscha Krettek
Hi,

For the first question, I think both approaches should work. You only have to 
be careful about startNewChain() because the behaviour can be somewhat 
unexpected. What it does is specify, that a new chain should be started with 
the operator on which you call startNewChain(). For example, in:

DataStream input = ...

input
  .map().name("map1")
  .map().name("map2")
  .startNewChain()
  .map().name("map3")

You will have one chain ("map1") and a second chain ("map2", "map3").

For the second question, I think to make sure that each operator is on a 
separate machine you would have to set the number of slots to 1. This way you 
get 3 slots and if you set the resource group or chaining right you will have 
each operator on a different slot.

Best,
Aljoscha

> On 8. Sep 2017, at 16:32, AndreaKinn  wrote:
> 
> Hi, firstly excuse me for the long post.
> I already read the documentation about parallelism, slots and the API about
> it but I still have some doubts about practical implementations of them.
> My program is composed essentially by three operations:
> 
> - get data from a kafka source
> - perform a machine learning operator on the retrieved stream
> - push out data to a cassandra sink
> 
> I'd like to investigate and trying implement them in two different
> situations:
> 
> 
> 1) FIRST ONE
> 
> Imagine I have a single dual core physical node and suppose I set
> NumberOfTaskSlot = NumberOfCore (As suggested by the doc). 
> 
> 
>  
> 
> I suppose I can divide in a fixed way the operations into slots as described
> in the figure. Is this possible?
> Can I do that using slotSharingGroup(groupname) method ? Or have I to use
> startNewChain() between the operator?
> Example:
> 
> *DataStream stream = env
>   .addSource(new FlinkKafkaConsumer010<>(TOPIC, 
> new CustomDeserializer(),
> properties))
>   .assignTimestampsAndWatermarks(new 
> CustomTimestampExtractor())
>   .map(...)
>   .slotSharingGroup("source");*
> 
> or
> 
> *DataStream stream = env
>   .addSource(new FlinkKafkaConsumer010<>(TOPIC, 
> new CustomDeserializer(),
> properties))
> .startNewChain()
>   .assignTimestampsAndWatermarks(new 
> CustomTimestampExtractor())
> .startNewChain()
>   .map(...);
>   *
> 
> 
> 2) SECOND ONE
> 
> Imagine I have 3 dual core physical nodes.
> 
> 
>  
> 
> I suppose I can reserve one physical NODE for each operation. Is this
> possible?
> In this case honestly I don't know how to implement that at level code.
> Moreover, I don't know if it would has sense set NumberTaskSlot =
> NumberOfCores or to leave this option to Flink's choice.
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: How to user ParameterTool.fromPropertiesFile() to get resource file inside my jar

2017-09-08 Thread Aljoscha Krettek
Hi,

How are you specifying the path for the properties file? Have you tried reading 
the properties by using this.getClass().getClassLoader().getResource()?

Best,
Aljoscha

> On 8. Sep 2017, at 16:32, Tony Wei  wrote:
> 
> Hi,
> 
> I put the my configuration file in `./src/main/resources/` and packed it 
> inside my jar.
> I want to run it on standalone cluster by using web UI to submit my job.
> No matter which way I tried, the ParameterTool.fromPropertiesFile() couldn't 
> find the file path, but threw `FileNotFoundException` instead.
> Is there any best practice to deal with such problem? Thanks for your help.
> 
> Best Regards,
> Tony Wei



Re: Assigning operators to slots

2017-09-08 Thread AndreaKinn
Nice, thank you for reply.

So if I call slotSharedGroup(groupname) on the last operator as here:

DataStream stream = env 
 .addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), 
 properties)) 
 .assignTimestampsAndWatermarks(new CustomTimestampExtractor()) 
 .map(...) 
 .slotSharingGroup("source");

it is applied to all the previous operator right? Or I have to call it after
each operator?
i.e I want that addSource, assignTimestamp and map reside on the same slot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: BucketingSink never closed

2017-09-08 Thread Aljoscha Krettek
Hi,

Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem 
is that the job is shutting down before a last checkpoint can "confirm" the 
written bucket data by moving it to the final state. The problem, as Kostas 
noted is that a user function (and thus also BucketingSink) does not know 
whether close() is being called because of a failure or because normal job 
shutdown. Therefore, we cannot move data to the final stage there.

Once we have the issue that Kostas posted resolve we can also resolve this 
problem for the BucketingSink.

Best,
Aljoscha

> On 8. Sep 2017, at 16:48, Kostas Kloudas  wrote:
> 
> Hi Flavio,
> 
> If I understand correctly, I think you bumped into this issue: 
> https://issues.apache.org/jira/browse/FLINK-2646 
> 
> 
> There is also a similar discussion on the BucketingSink here: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468
>  
> 
> 
> Kostas
> 
>> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier > > wrote:
>> 
>> Hi to all,
>> I'm trying to test a streaming job but the files written by the 
>> BucketingSink are never finalized (remains into the pending state).
>> Is this caused by the fact that the job finishes before the checkpoint?
>> Shouldn't the sink properly close anyway?
>> 
>> This is my code:
>> 
>>   @Test
>>   public void testBucketingSink() throws Exception {
>> final StreamExecutionEnvironment senv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> final StreamTableEnvironment tEnv = 
>> TableEnvironment.getTableEnvironment(senv);
>> senv.enableCheckpointing(5000);
>> DataStream testStream = senv.fromElements(//
>> "1,aaa,white", //
>> "2,bbb,gray", //
>> "3,ccc,white", //
>> "4,bbb,gray", //
>> "5,bbb,gray" //
>> );
>> final RowTypeInfo rtf = new RowTypeInfo(
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO, 
>> BasicTypeInfo.STRING_TYPE_INFO);
>> DataStream rows = testStream.map(new MapFunction() {
>> 
>>   private static final long serialVersionUID = 1L;
>> 
>>   @Override
>>   public Row map(String str) throws Exception {
>> String[] split = str.split(Pattern.quote(","));
>> Row ret = new Row(3);
>> ret.setField(0, split[0]);
>> ret.setField(1, split[1]);
>> ret.setField(2, split[2]);
>> return ret;
>>   }
>> }).returns(rtf);
>> 
>> String columnNames = "id,value,state";
>> final String dsName = "test";
>> tEnv.registerDataStream(dsName, rows, columnNames);
>> final String whiteAreaFilter = "state = 'white'";
>> DataStream grayArea = rows;
>> DataStream whiteArea = null;
>> if (whiteAreaFilter != null) {
>>   String sql = "SELECT *, (%s) as _WHITE FROM %s";
>>   sql = String.format(sql, whiteAreaFilter, dsName);
>>   Table table = tEnv.sql(sql);
>>   grayArea = 
>> tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
>>   DataStream nw = 
>> tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
>>   whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>> }
>> Writer bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
>> 
>> String datasetWhiteDir = "/tmp/bucket/white";
>> BucketingSink whiteAreaSink = new 
>> BucketingSink<>(datasetWhiteDir.toString());
>> whiteAreaSink.setWriter(bucketSinkwriter);
>> whiteAreaSink.setBatchSize(10);
>> whiteArea.addSink(whiteAreaSink);
>> 
>> String datasetGrayDir = "/tmp/bucket/gray";
>> BucketingSink grayAreaSink = new 
>> BucketingSink<>(datasetGrayDir.toString());
>> grayAreaSink.setWriter(bucketSinkwriter);
>> grayAreaSink.setBatchSize(10);
>> grayArea.addSink(grayAreaSink);
>> 
>> JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>> System.out.printf("Job took %s minutes", 
>> jobInfo.getNetRuntime(TimeUnit.MINUTES));
>>   }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public class RowCsvWriter extends StreamWriterBase {
>>   private static final long serialVersionUID = 1L;
>> 
>>   private final String charsetName;
>>   private transient Charset charset;
>>   private String fieldDelimiter;
>>   private String recordDelimiter;
>>   private boolean allowNullValues = true;
>>   private boolean quoteStrings = false;
>> 
>>   /**
>>* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to 
>> convert strings to
>>* bytes.
>>*/
>>   public RowCsvWriter() {
>> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, 
>> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>>   }
>> 
>>   /**
>>* Creates a new {@code StringWriter} that uses the given ch

Re: Fwd: HA : My job didn't restart even if task manager restarted.

2017-09-08 Thread Fabian Hueske
Hi,

sorry for the late response!
I'm not familiar with the details of the failure recovery but Till (in CC)
knows the code in depth.
Maybe he can figure out what's going on.

Best, Fabian

2017-09-06 5:35 GMT+02:00 sunny yun :

> I am still struggling to solve this problem.
> I have no doubt that the JOB should automatically restart after restarting
> the TASK MANAGER in YARN MODE. Is it a misunderstood?
>
> Problem seems that *JOB MANAGER still try to connect to old TASK MANAGER
> even after new TASK MANAGER container be created.*
> When I killed TM on node#2 then new TM container is created on node#3, but
> JM still tries to connect to TM on node#2 according to the log file. (It
> was
> not a log I posted before, when I found it while continuing the test.
> Normally the TM be created on the same node after killed.)
> So new TM don't know JOB info and JM show us JOB with fail status.
>
> If anyone has succeeded in the same situation(YARN + TM FAILURE), please
> just tell me.
> That will be big help to me.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: How to user ParameterTool.fromPropertiesFile() to get resource file inside my jar

2017-09-08 Thread Tony Wei
Hi Aljoscha,

I have tried
`StreamJob.class.getClassLoader().getResource("application.conf").getPath()`,
but I got this exception.

Caused by: java.io.FileNotFoundException: Properties file
/home/tonywei/flink/file:/tmp/flink-web-24351e69-a261-45be-9503-087db8155a8f/d69a3ca9-bfa0-43ef-83e8-e15f38162a87_quickstart-0.1.jar!/application.conf

Best Regards,
Tony Wei

2017-09-08 23:24 GMT+08:00 Aljoscha Krettek :

> Hi,
>
> How are you specifying the path for the properties file? Have you tried
> reading the properties by using this.getClass().
> getClassLoader().getResource()?
>
> Best,
> Aljoscha
>
> > On 8. Sep 2017, at 16:32, Tony Wei  wrote:
> >
> > Hi,
> >
> > I put the my configuration file in `./src/main/resources/` and packed it
> inside my jar.
> > I want to run it on standalone cluster by using web UI to submit my job.
> > No matter which way I tried, the ParameterTool.fromPropertiesFile()
> couldn't find the file path, but threw `FileNotFoundException` instead.
> > Is there any best practice to deal with such problem? Thanks for your
> help.
> >
> > Best Regards,
> > Tony Wei
>
>


Re: Disable job graph in web UI

2017-09-08 Thread Joshua Griffith
Upon further inspection, it appears that the web UI redraws each DOM element 
with every update. So I think removing the graph won’t fix the page performance 
issue because each task list item is being redrawn on every refresh.

> On Sep 7, 2017, at 2:22 PM, Joshua Griffith  wrote:
> 
> Hello, I have an auto-generated job that creates too many tasks for web UI’s 
> job graph to handle. The browser pinwheels while the page attempts to load. 
> Is it possible to disable the job graph component in the web UI? For slightly 
> smaller jobs, once the graph loads the rest of the UI is usable.
> 
> Thanks,
> 
> Joshua



Re: Assigning operators to slots

2017-09-08 Thread AndreaKinn
UPDATE:

I'm trying to implement the version with one node and two task slots on my
laptop. I have also in configured flink-conf.yaml the key: 

taskmanager.numberOfTaskSlots: 2

but when I execute my program in the IDE:

/org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. /

parallelism is set 1.

Which could be the problem?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Table API and registration of DataSet/DataStream

2017-09-08 Thread Flavio Pompermaier
Hi to all,
I have a doubt about Table API.
Let's say my code is something like:


StreamTableEnvironment te = ...;
RowTypeInfo rtf = new RowTypeInfo(...);
DataStream myDs =
te.registerDataStream("test",myDs,columnNames);

Table table = te.sql("SELECT *, (NAME = 'John') as VALID FROM test WHERE
...";
myDs = te.toDataStream(table.where("VALID").select(columnNames), rtf);

If I do:

DataStream res = te.sql("SELECT * FROM test");

I'd like that res could take the data from the last version of myDs...is
this program correct..?
Or should I override the "test" table in the tableEnvironment? Is that
possible? I don't see any API to allow this..

Best,
Flavio


Error a simple window example

2017-09-08 Thread philippe

Hi all,

I am trying to run a simple example in the Scala shell:


case class MonEntier(classe: Int, valeur: Int)
val stream =3D senv.socketTextStream("localhost", 9000, '\n')
val w =3D stream.map ( { x =3D> Tuple1(x.toInt) } )
 .map( {y =3D> MonEntier(y._1 % 3, y._1) } )
 =
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
 .print()


The socket at 9000 sends a stream of integers

I get a compilation error:

:68: error: not found: value TumblingProcessingTimeWindows
.windowAll(new =
TumblingProcessingTimeWindows.of(Time.seconds(5)))


The sniper directly comes from the doc. Help welcome !

Philippe


Re: File System State Backend

2017-09-08 Thread rnosworthy
Thanks for the response.

Thats correct, they do not get purged/deleted while the job is running. I
have 3 concurrent jobs running and there are 3 directories in the data
directory.

/var/data/flink/2375c69006bfeca9644171f31b444dff
/var/data/flink/c3264bb6d5e068d6440bbb21069b7d28
/var/data/flink/f81d50eb4644cdf65f8f0513713c9d61

in each one of those folders there is chk-1 all the way to chk-2777 for
example

I do not have a hdfs, I have the flink task manager on a debian vm



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: File System State Backend

2017-09-08 Thread Stephan Ewen
Hi!

Checkpoints in Flink need to go to a file system that is accessible across
machines. Otherwise there could be no recovery of a data of a failed
machine.

The cleanup is also triggered by a different node than the node that
checkpoints - hence you see no cleanup in your setup.

Best,
Stephan

On Fri, Sep 8, 2017 at 6:19 PM, rnosworthy  wrote:

> Thanks for the response.
>
> Thats correct, they do not get purged/deleted while the job is running. I
> have 3 concurrent jobs running and there are 3 directories in the data
> directory.
>
> /var/data/flink/2375c69006bfeca9644171f31b444dff
> /var/data/flink/c3264bb6d5e068d6440bbb21069b7d28
> /var/data/flink/f81d50eb4644cdf65f8f0513713c9d61
>
> in each one of those folders there is chk-1 all the way to chk-2777 for
> example
>
> I do not have a hdfs, I have the flink task manager on a debian vm
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Installing Apache Flink on Mesos Cluster without DC/OS

2017-09-08 Thread Rahul Raj
 Hi,

I am newbie in Apache Flink and our team is trying to set up an Apache
Flink Cluster on Apaches Mesos. We have already installed Apache Mesos &
Marathon with 3 Master nodes and 3 Slaves and now we are trying to install
Apache Flink without DC/OS as mentioned here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html#mesos-without-dcos.


I have couple of questions over here :

   1.

   Do we need to download Flink on all the nodes(master and slaves) and
   configure mesos.master in all nodes?
   2.

   Or Shall we download flink on only one master node and configure
   mesos.master over there?
   3.

   If flink needs to be downloaded on all the nodes then what should be the
   location of flink directory or if there is any script where I can specify
   that?
   4.

   Is running "mesos-appmaster.sh" on master node also responsible for
   running flink libraries and classes on slaves?

Thanks

Rahul Raj


Re: File System State Backend

2017-09-08 Thread rnosworthy
Can I utilize disk on the job manager for this or do I need a dedicated disk
storage vm? 
How do I specify not only directory but ip address of the checkpoint data
directory?
Is there any docs to configure a a state backend without using hdfs or s3?

thanks for your help

Ryan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Flink internals

2017-09-08 Thread Junguk Cho
Hi, Timo.

Thank you for detailed replies.
It helps me to understand flink a lot.

However, there are misinterpreted points.

2. From a user's perspective you can only see the "real data". Internally,
there are different types of records that flow through the topology (namely
watermarks, checkpoint barriers, latency markers, and records with or
without timestamp metadata).
-> I understood there are several type of records. I wonder "record" class
and its members. E.g., Tuple in Storm (
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
)


6. I don't know about the internals of iteration feature but you might be
right. Cyclic dataflows are not fully supported yet. E.g. they are also not
participating in Flink's checkpointing mechanism.
->  Based on Section 3.4 in this paper (
http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf), it seemed that Flink
supports checkingpoint for cyclic dataflows. However, there is this
limitation (Cycled tasks act as regular dataflow source and sink
respectively, yet they are collocated in the same physical instance to
share an in-memory buffer and thus, implement loopback stream
transparently.).

In general, I would recommmend to import Flink into your IDE and set a
breakpoint in an example (e.g. within a mapper before a keyBy) and run it
in debug mode. You can step through the layers to see more about the
internals. This should answer most of your question, otherwise feel free to
ask again.
-> I will try this. Thanks a lot.

Thanks,
Junguk

On Thu, Sep 7, 2017 at 6:14 AM, Timo Walther  wrote:

> Hi Junguk,
>
> I try to answer your questions, but also loop in Ufuk who might now more
> about the network internals:
>
> 1. Yes, every operator/operator chain has a "setParallelism()" method do
> specify the parallelism. The overall parallelism of the job can be set when
> submitting a job. The parallelism per TaskManager is determined by the
> number of slots.
>
> 2. From a user's perspective you can only see the "real data". Internally,
> there are different types of records that flow through the topology (namely
> watermarks, checkpoint barriers, latency markers, and records with or
> without timestamp metadata).
>
> 3. See my last comment.
>
> 4. Flink also uses heartbeat messages between JobManager and TaskManagers.
> In case of a failure the JobManager restores the entire topology to the
> last successful checkpoint. See [1] for more explanation. In the future it
> is planned to recover more fine-grained.
>
> 5. Source workers should not be directly connected but though systems like
> Kafka or Pravega. Not only for replaying in case of failures but also for
> using it as the single source of truth in case your processing logic needs
> to be adapted. E.g. you had a bug in your application and the state that
> you have built is invalid, you want to be able to correct your mistake and
> rebuild the state in a batch. The folks from Drivetribe showed a very nice
> architecture [2]. I don't know if replaying your IoT devices would make
> sense, in theory you could implement your own connector that implements a
> similar logic as Flink's Kafka consumer.
>
> 6. I don't know about the internals of iteration feature but you might be
> right. Cyclic dataflows are not fully supported yet. E.g. they are also not
> participating in Flink's checkpointing mechanism.
>
> In general, I would recommmend to import Flink into your IDE and set a
> breakpoint in an example (e.g. within a mapper before a keyBy) and run it
> in debug mode. You can step through the layers to see more about the
> internals. This should answer most of your question, otherwise feel free to
> ask again.
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> internals/stream_checkpointing.html
> [2] https://data-artisans.com/blog/drivetribe-cqrs-apache-flink
>
> Am 06.09.17 um 21:54 schrieb Junguk Cho:
>
> Hi, All.
>>
>> I am new to Flink.
>> I just installed Flink in clusters and start reading documents to
>> understand Flink internals.
>> After reading some documents, I have some questions.
>> I have some experiences of Storm and Heron before, so I am linking their
>> mechanisms to questions to better understand Flink.
>>
>> 1. Can I specify worker parallelism explicitly like Storm?
>>
>> 2. Record in Flink
>> Can I think a "record" in FLINK is almost same as Tuple in Storm?
>> Tuple in Storm is used for carrying "real data" + "metadata (e.g., stream
>> type, source id and so on).
>>
>> 3. How does partition (e.g., shuffling,  map) works internally?
>> In Storm, it has (worker id) : (tcp info to next workers) tables.
>> So, based on this information, after executing partition function, Tuple
>> is  forwarded to next hops based on tables.
>> Is it the same?
>>
>> 4. How does Flink detect fault in case of worker dead machine failure?
>> Based on documents, Job manager checks liveness of task managers with
>> heartbeat message.
>> In Stor

State Issue

2017-09-08 Thread Navneeth Krishnan
Hi,

I'm experiencing a wired issue where any data put into map state when
retrieved with the same key is returning as null and hence it puts the same
value again and again. I used rocksdb state backend but tried with Memory
state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I
couldn't access the previous value. But when I iterate over the MapState
keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.

*NestedMapsStateTable.java*

S get(K key, int keyGroupIndex, N namespace) {

   checkKeyNamespacePreconditions(key, namespace);

   Map> namespaceMap = getMapForKeyGroup(keyGroupIndex);



* if (namespaceMap == null) {  return null;   }*

   Map keyedMap = namespaceMap.get(namespace);

   if (keyedMap == null) {
  return null;
   }

   return keyedMap.get(key);
}


*HeapMapState.java*

@Override
public void put(UK userKey, UV userValue) {

   HashMap userMap = stateTable.get(currentNamespace);



* if (userMap == null) {  userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);   }*

   userMap.put(userKey, userValue);
}


*My Code:*

*open()*

MapStateDescriptor testStateDescriptor = new
MapStateDescriptor<>("test-state",
TypeInformation.of(new TypeHint() {}),
TypeInformation.of(new TypeHint() {}));

testState = getRuntimeContext().getMapState(testStateDescriptor);


*flatMap:*

if(testState.contains(user)){
*// DO Something*
} else {
testState.put(user, userInfo);
}


streamEnv.setStateBackend(new MemoryStateBackend());

streamEnv.setParallelism(1);


Thanks


question on sideoutput from ProcessWindow function

2017-09-08 Thread Prabhu V
Hi,

Can we have a side output from a process window function ?

I am currently genrating a side output from a process function. The main
output of the process function is then Windowed and a ProcessWindowFunction
is applied on the windows.

Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
unable to find any api that enables this functionality.

Thanks,
Prabhu


Re: question on sideoutput from ProcessWindow function

2017-09-08 Thread Chen Qin
Hi Prabhu,

That is good question, the short answer is not yet, only ProcessFunction
was given flexibility of doing customized sideoutput at the moment.
Window Function wasn't given such flexibility partially due to sideoutput
initially targeting late arriving event for window use cases.

+@Aljoscha might have better picture on this question.

Thanks,
Chen

On Fri, Sep 8, 2017 at 7:19 PM, Prabhu V  wrote:

> Hi,
>
> Can we have a side output from a process window function ?
>
> I am currently genrating a side output from a process function. The main
> output of the process function is then Windowed and a ProcessWindowFunction
> is applied on the windows.
>
> Can I add to the SideOutpuStream from the ProcessWindowFunction. I am
> unable to find any api that enables this functionality.
>
> Thanks,
> Prabhu
>