[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r202574665
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
   

[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...

2018-07-11 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6281
  
Hi @aljoscha and @StephanEwen . I have updated the PR, please have a look.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201342618
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
   

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201059374
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
+   }
+   }
+
+   this.pending = new ArrayList<>();
+   this.pendingPerCheckpoint = new HashMap<>();
+   }
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   lo

[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201059444
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 ---
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
BucketingSink}.
+ *
+ *
+ * For each incoming  element in the {@code BucketingSink}, the 
user-specified
+ * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer 
Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+public class Bucket {
+
+   private static final String PART_PREFIX = "part";
+
+   private final Path bucketPath;
+
+   private int subtaskIndex;
+
+   private long partCounter;
+
+   private long creationTime;
+
+   private long lastWrittenTime;
+
+   private final long maxPathSize;
+
+   private final long rolloverTime;
+
+   private final long inactivityTime;
+
+   private final Writer outputFormatWriter;
+
+   private final ResumableWriter fsWriter;
+
+   private RecoverableFsDataOutputStream currentOpenPartStream;
+
+   private List pending = new 
ArrayList<>();
+
+   private Map> 
pendingPerCheckpoint = new HashMap<>();
+
+   public Bucket(
+   ResumableWriter fsWriter,
+   int subtaskIndex,
+   Path bucketPath,
+   long initialPartCounter,
+   long maxPartSize,
+   long rolloverTime,
+   long inactivityTime,
+   Writer writer,
+   BucketState bucketstate) throws IOException {
+
+   this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, 
maxPartSize, rolloverTime, inactivityTime, writer);
+
+   // the constructor must have already initialized the filesystem 
writer
+   Preconditions.checkState(fsWriter != null);
+
+   // we try to resume the previous in-progress file, if the 
filesystem
+   // supports such operation. If not, we just commit the file and 
start fresh.
+
+   final ResumableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+   if (resumable != null) {
+   this.currentOpenPartStream = 
fsWriter.recover(resumable);
+   this.creationTime = bucketstate.getCreationTime();
+   }
+
+   // we commit pending files for previous checkpoints to the last 
successful one
+   // (from which we are recovering from)
+   for (List commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+   for (ResumableWriter.CommitRecoverable commitable: 
commitables) {
+   fsWriter.recoverForCommit(commitable).commit();
--- End diff --

You are right!


---


[GitHub] flink issue #6176: [FLINK-9603][connector-filesystem] fix part indexing, whe...

2018-07-09 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6176
  
Hi @kent2171 ! Do not worry, I can fix it ;) Thanks for having a look!



---


[GitHub] flink issue #6176: [FLINK-9603][connector-filesystem] fix part indexing, whe...

2018-07-09 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6176
  
Thanks for the work @kent2171 ! I will merge this as soon as Travis gives 
the green light!


---


[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...

2018-07-09 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6281
  
Hi @xndai ! I will update an outdated design doc and will attach it to the 
JIRA! I will ping you here to have a look.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r200909011
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Javadoc.
--- End diff --

Thanks for the catch! I will update.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-07 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/6281

[FLINK-9750] Add new StreamingFileSink with ResumableWriter.

## What is the purpose of the change

This PR is the first step towards introducing a new Streaming Filesystem 
sink that works on top of Flink's filesystem abstraction and provides support 
for both row and block-based formats (like ORC/Parquet).

The current version only supports the LocalFileSystem.

## Brief change log

The first commit introduces the new `ResumableWriter` abstraction and an 
implementation for the `LocalFileSystem`, while the second introduces the new 
`StreamingFileSink`. 

## Verifying this change

This changes add tests in the `LocalStreamingFileSinkTest` and  the 
`BucketStateSerializerTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented yet

**NOTE TO REVIEWER**: Still logging is missing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink bucketing-local-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6281


commit 64cdd7b57f6f71db3fa8fe90d9eb37e784b9ae56
Author: Stephan Ewen 
Date:   2018-06-29T17:27:58Z

[hotfix] [core] Remove unused class AbstractMultiFSDataInputStream

commit 2a7ed070dd7b57a58a7588b7ce03c4032a3283fc
Author: Stephan Ewen 
Date:   2018-06-29T17:15:54Z

[FLINK-9751][filesystem] Add PersistentResumableWriter interface.

commit da64fedced17cef7d53448dac360a26ae9d32204
Author: kkloudas 
Date:   2018-07-06T14:38:08Z

[FLINK-9750] Add new StreamingFileSink on top of the ResumableWriter.

This commit introduces the new sink and tests it on the LocalFileSystem.




---


[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join

2018-07-05 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5342
  
Thanks for the work @florianschmidt1994 ! Merging this.


---


[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...

2018-06-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198474417
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -330,77 +328,85 @@ private boolean isStateTimedOut(final 
ComputationState state, final long timesta
}
}
 
-   discardComputationStatesAccordingToStrategy(
-   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
+   if (!potentialMatches.isEmpty()) {
+   nfaState.setStateChanged();
+   }
+
+   List>> result = new ArrayList<>();
+   if (afterMatchSkipStrategy.isSkipStrategy()) {
+   processMatchesAccordingToSkipStrategy(sharedBuffer,
+   nfaState,
+   afterMatchSkipStrategy,
+   potentialMatches,
+   result);
+   } else {
+   for (ComputationState match : potentialMatches) {
+   
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
--- End diff --

Instead of accessing the state for every match, why not passing all the 
matches to the shared buffer, and try to fetch the common ones only once. If 2 
matches A and B share event with id = 2, we fetch from state only once.


---


[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...

2018-06-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198473426
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.ComputationState;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ */
+public abstract class AfterMatchSkipStrategy implements Serializable {
+
+   private static final long serialVersionUID = -4048930333619068531L;
+
+   /**
+* Discards every partial match that contains event of the match 
preceding the first of *PatternName*.
+*
+* @param patternName the pattern name to skip to
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipToFirst(String patternName) {
+   return new SkipToFirstStrategy(patternName);
+   }
+
+   /**
+* Discards every partial match that contains event of the match 
preceding the last of *PatternName*.
+*
+* @param patternName the pattern name to skip to
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipToLast(String patternName) {
+   return new SkipToLastStrategy(patternName);
+   }
+
+   /**
+* Discards every partial match that contains event of the match.
+*
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipPastLastEvent() {
+   return SkipPastLastStrategy.INSTANCE;
+   }
+
+   /**
+* Every possible match will be emitted.
+*
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy noSkip() {
+   return NoSkipStrategy.INSTANCE;
+   }
+
+   /**
+* Tells if the strategy may skip some matches.
+*
+* @return false if the strategy is NO_SKIP strategy
+*/
+   public abstract boolean isSkipStrategy();
+
+   /**
+* Prunes matches/partial matches based on the chosen strategy.
+*
+* @param partialMatches current partial matches
+* @param matchedResult  already completed matches
+* @param sharedBuffer   corresponding shared buffer
+* @throws Exception thrown if could not access the state
+*/
+   public void prune(
+   Collection partialMatches,
--- End diff --

The name `partialMatches` is misleading because we use it also with the 
`completedMatches`.


---


[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...

2018-06-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198472975
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java 
---
@@ -79,18 +98,18 @@ public boolean equals(Object o) {
return false;
}
NFAState nfaState = (NFAState) o;
-   return  Objects.equals(computationStates, 
nfaState.computationStates);
+   return Objects.equals(partialMatches, nfaState.partialMatches);
}
 
@Override
public int hashCode() {
-   return Objects.hash(computationStates, stateChanged);
+   return Objects.hash(partialMatches, stateChanged);
--- End diff --

Same as above.


---


[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...

2018-06-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198472927
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java 
---
@@ -79,18 +98,18 @@ public boolean equals(Object o) {
return false;
}
NFAState nfaState = (NFAState) o;
-   return  Objects.equals(computationStates, 
nfaState.computationStates);
+   return Objects.equals(partialMatches, nfaState.partialMatches);
--- End diff --

What about the `completedMatches`?


---


[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...

2018-06-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198473858
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -330,77 +328,85 @@ private boolean isStateTimedOut(final 
ComputationState state, final long timesta
}
}
 
-   discardComputationStatesAccordingToStrategy(
-   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
+   if (!potentialMatches.isEmpty()) {
+   nfaState.setStateChanged();
+   }
+
+   List>> result = new ArrayList<>();
+   if (afterMatchSkipStrategy.isSkipStrategy()) {
+   processMatchesAccordingToSkipStrategy(sharedBuffer,
+   nfaState,
+   afterMatchSkipStrategy,
+   potentialMatches,
+   result);
+   } else {
+   for (ComputationState match : potentialMatches) {
+   
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
+   match.getVersion()).get(0)));
+   
sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+   }
+   }
 
return result;
}
 
-   private void discardComputationStatesAccordingToStrategy(
-   final SharedBuffer sharedBuffer,
-   final Queue computationStates,
-   final Collection>> matchedResult,
-   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
+   private void processMatchesAccordingToSkipStrategy(
+   SharedBuffer sharedBuffer,
+   NFAState nfaState,
+   AfterMatchSkipStrategy afterMatchSkipStrategy,
+   PriorityQueue potentialMatches,
+   List>> result) throws Exception {
 
-   Set discardEvents = new HashSet<>();
-   switch(afterMatchSkipStrategy.getStrategy()) {
-   case SKIP_TO_LAST:
-   for (Map> resultMap: 
matchedResult) {
-   for (Map.Entry> 
keyMatches : resultMap.entrySet()) {
-   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
-   break;
-   } else {
-   
discardEvents.addAll(keyMatches.getValue());
-   }
-   }
-   }
-   break;
-   case SKIP_TO_FIRST:
-   for (Map> resultMap: 
matchedResult) {
-   for (Map.Entry> 
keyMatches : resultMap.entrySet()) {
-   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-   break;
-   } else {
-   
discardEvents.addAll(keyMatches.getValue());
-   }
-   }
-   }
-   break;
-   case SKIP_PAST_LAST_EVENT:
-   for (Map> resultMap: 
matchedResult) {
-   for (List eventList: 
resultMap.values()) {
-   discardEvents.addAll(eventList);
-   }
-   }
-   break;
-   }
-   if (!discardEvents.isEmpty()) {
-   List discardStates = new 
ArrayList<>();
-   for (ComputationState computationState : 
computationStates) {
-   boolean discard = false;
-   Map> partialMatch = 
extractCurrentMatches(sharedBuffer, computationState);
-   for (List list: partialMatch.values()) {
-   for (T e: list) {
-   if (discardEvents.contains(e)) {
-  

[GitHub] flink pull request #6152: [FLINK-9467][metrics][WebUI] Fix watermark display...

2018-06-21 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6152#discussion_r197087153
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1254,15 +1254,15 @@ Thus, in order to infer the metric identifier:
   Counter
 
 
-  Operator
   currentInputWatermark
   
-The last watermark this operator has received (in milliseconds).
-Note: For operators with 2 inputs this is the 
minimum of the last received watermarks.
+The last watermark this operator/tasks has received (in 
milliseconds).
+Note: For operators/tasks with 2 inputs this 
is the minimum of the last received watermarks.
   
   Gauge
 
 
+  Operator
--- End diff --

I would put "Operator Only" to emphasize the diff with the above.


---


[GitHub] flink pull request #6152: [FLINK-9467][metrics][WebUI] Fix watermark display...

2018-06-21 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6152#discussion_r197086995
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1228,7 +1228,7 @@ Thus, in order to infer the metric identifier:
   Meter
 
 
-  Task/Operator
+  Task/Operator
--- End diff --

I would put "Task and Operator" rather than "Task/Operator"


---


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-15 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6130
  
Hi @bowenli86, 
Me, @zentol and @aljoscha both seem to have doubts about the utility of the 
feature.

So given this, and to have a clean JIRA and list of PRs we have to work on, 
I would suggest 
to close the PR and the related issue.



---


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-13 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6130
  
Hi @bowenli86 ! 

Why not having a `flatmap` after the `readFile` and for every incoming 
element you emit as many copies as you want?

Personally, I am not so fond of adding methods to the public APIs for such 
specific usecases.


---


[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join

2018-06-13 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5342
  
Thanks @florianschmidt1994 . I will, but may be not today.


---


[GitHub] flink issue #5960: [Flink-8725] Separate state from NFA in CEP library

2018-06-13 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5960
  
@dawidwys Also when you merge the other one, could you also close this PR?


---


[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-13 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6059
  
It would be great if @Aitozi could also report any numbers he has, so that 
we see the benefits of the change.


---


[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...

2018-06-11 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6126
  
I got this, it is just that I am wondering how useful this is, given that 
the operators/functions are chained. 


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194473454
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

actually I would recommend having a method: `nfa.advanceTime()` or 
`nfa.clearUpTo(timestamp)` and we call it from the 
`AbstractKeyedCEPPatternOperator.advanceTime()` after calling `processEvent()`. 
This way it is clearer what it does and when it is called. What do you think?


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194470370
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

I see. Then at least we should have a comment there explaining it. 


---


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6059#discussion_r194468194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -296,42 +292,31 @@ public void resetNFAChanged() {
if (shouldDiscardPath) {
// a stop state was reached in this branch. 
release branch which results in removing previous event from
// the buffer
-   for (final ComputationState state : 
statesToRetain) {
-   eventSharedBuffer.release(
-   
NFAStateNameHandler.getOriginalNameFromInternal(
-   
state.getPreviousState().getName()),
-   state.getEvent(),
-   state.getTimestamp(),
-   state.getCounter());
+   for (final ComputationState state : 
statesToRetain) {
+   
sharedBuffer.releaseNode(state.getPreviousBufferEntry());
}
} else {
computationStates.addAll(statesToRetain);
}
 
}
 
-   discardComputationStatesAccordingToStrategy(computationStates, 
result, afterMatchSkipStrategy);
-
-   // prune shared buffer based on window length
-   if (windowTime > 0L) {
-   long pruningTimestamp = timestamp - windowTime;
-
-   if (pruningTimestamp < timestamp) {
-   // the check is to guard against underflows
+   discardComputationStatesAccordingToStrategy(
+   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
 
-   // remove all elements which are expired
-   // with respect to the window length
-   if (eventSharedBuffer.prune(pruningTimestamp)) {
-   nfaChanged = true;
-   }
-   }
+   if (event.getEvent() == null) {
--- End diff --

Why here `==null`? I may be missing sth.


---


[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...

2018-06-11 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6126
  
If this is correct, then I suppose we can remove it altogether so that the 
code is also cleaner, right?


---


[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...

2018-06-11 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6108
  
@zhangxinyu1 as soon as this sink is ready, I believe that the existing 
File Source will be able to read the output of the Bucketing Sink. As far as 
bandwidth limitations are concerned, could you elaborate a bit on what you 
mean? You want to tell the source to read at speed X records/sec?


---


[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...

2018-06-04 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6108
  
Thanks for the useful input here @zhangxinyu1 and @StephanEwen. As soon as 
I have sth concrete I create the JIRA and post it here.


---


[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-06-01 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6059
  
Hi @dawidwys! Thanks for the work. 

The changes seem really good. As we discussed privately I did a review and 
the comments you can find them in the branch I sent you.

The only things I would like to write here as a reminder are:

1) as a future commit, we could in some cases do the cleaning up during 
discovering the partial matches, i.e. do the `releaseNode()`  from within the  
`extractCurrentMatches()` (whenever appropriate). This could be a separate JIRA 
actually.

2) when migrating from an old version, we should ignore potential 
"dead-end" edges (edges that point to nodes that have been removed) in the 
partial match buffer. 


---


[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...

2018-05-29 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460728
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
+   new KeyedProcessOperator<>(new 
KeyQueryingProcessFunction());
+
+   OneInputStreamOperatorTestHarness, 
String> testHarness =
+   new KeyedOneInputStreamOperatorTestHarness<>(operator, 
(in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO);
+
--- End diff --

same here.


---


[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...

2018-05-29 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6097#discussion_r191460316
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -50,6 +52,30 @@
@Rule
public ExpectedException expectedException = ExpectedException.none();
 
+   @Test
+   public void testKeyQuerying() throws Exception {
+
+   KeyedProcessOperator, String> 
operator =
--- End diff --

The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would 
recommend to go with
```
try(harness=...) {
...
}
```
And remove the explicit call to `harness.close()`. This is a nice practice 
to start enforcing in new tests as it cleans up any leaks in case of exceptions 
and stuff.


---


[GitHub] flink issue #6028: [FLINK-9356] Improve error message for when queryable sta...

2018-05-29 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/6028
  
LGTM, so +1 and I will merge later. 
Thanks for the work @yanghua and for the review @florianschmidt1994 .


---


[GitHub] flink pull request #6028: [FLINK-9356] Improve error message for when querya...

2018-05-17 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6028#discussion_r189015635
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
@@ -225,7 +225,9 @@ private void executeActionAsync(
 
return location;
} else {
-   return FutureUtils.completedExceptionally(new 
UnknownLocationException("Could not contact the state location oracle to 
retrieve the state location."));
+   return FutureUtils.completedExceptionally(
+   new UnknownLocationException("Could not contact 
the state location oracle to retrieve the state location for state="
+   + queryableStateName + " of job=" + 
jobId + ", the caused reason maybe the state is not ready or there is no job 
exists."));
--- End diff --

This message is pretty verbose and it contains grammatical errors. If it 
were to write this, it should be something like:

```
Could not contact the state location oracle to retrieve the location for 
state=QSName of job= JobID. The reason can be that the state is not ready or 
that that does not exist.
```

But before putting this in, it would be helpful if @florianschmidt1994 
commented on what he thinks, as he is the one that opened the issue.


---


[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-16 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5955
  
Hi @tzulitai ! Thanks for the review. I integrated most of your comments. 
The only one I left out is the one about merging the checkpointing and the 
checking. I am not against that. It is just that the way it is now, it is 
aligned with the `StatefulJobSavepointMigrationITCase`. If it were to make the 
change, then we should change both and I would prefer to do it in a separate 
commit.

Let me know what do you think about the current changes and if you are ok, 
I can merge.


---


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188614213
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBa

[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.

2018-05-14 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5922
  
Thanks for the review! Merging this.


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-14 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r187924456
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream colorPartitionedStream = shapeStream
+.keyBy(new KeySelector(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPartitionedStream
+ .connect(ruleBroadcastStream)
+ .process(
+ 
+ // type arguments in our 
KeyedBroadcas

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-14 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r187922328
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream colorPartitionedStream = shapeStream
+.keyBy(new KeySelector(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
--- End diff --

It is not even available as an option to call `connect()` in this case. The 
`broadcast(descriptor)` will give you a `BroadcastStream` back that has no 
available transformations. You can only use it as an argument for a `connect()`.


---


[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-14 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5955
  
Could you review it @tzulitai ?


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187677168
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Streaming application that creates an {@link Email} pojo with random 
ids and increasing
+ * timestamps and passes it to a stateful {@link 
org.apache.flink.api.common.functions.FlatMapFunction},
+ * where it is exposed as queryable state.
+ */
+public class QsStateProducer {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   ParameterTool tool = ParameterTool.fromArgs(args);
+   String tmpPath = tool.getRequired("tmp-dir");
+   String stateBackendType = tool.getRequired("state-backend");
+
+   StateBackend stateBackend;
+   switch (stateBackendType) {
+   case "rocksdb":
+   stateBackend = new RocksDBStateBackend(tmpPath);
+   break;
+   case "fs":
+   stateBackend = new FsStateBackend(tmpPath);
+   break;
+   case "memory":
+   stateBackend = new MemoryStateBackend();
+   break;
+   default:
+   throw new RuntimeException("Unsupported state 
backend " + stateBackendType);
+   }
+
+   env.setStateBackend(stateBackend);
+   env.enableCheckpointing(1000L);
+   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
+
+   env.addSource(new EmailSource())
+   .keyBy(new KeySelector() {
+
+   private static final long serialVersionUID = 
-1480525724620425363L;
+
+   @Override
+   public String getKey(Email value) throws 
Exception {
+   return ""

[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187676881
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -0,0 +1,120 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+

+QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar

+QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+#
+# Test that queryable state works as expected with HA mode when restarting 
a taskmanager
+#
+# The general outline is like this:
+# 1. start cluster in HA mode with 1 TM
+# 2. start a job that exposes queryable state from a mapstate with 
increasing num. of keys
+# 3. query the state with a queryable state client and expect no error to 
occur
+# 4. stop the TM
+# 5. check how many keys were in our mapstate at the time of the latest 
snapshot
+# 6. start a new TM
+# 7. query the state with a queryable state client and retrieve the number 
of elements
+#in the mapstate
+# 8. expect the number of elements in the mapstate after restart of TM to 
be > number of elements
+#at last snapshot
+#
+# Globals:
+#   QUERYABLE_STATE_SERVER_JAR
+#   QUERYABLE_STATE_CLIENT_JAR
+# Arguments:
+#   None
+# Returns:
+#   None
+#
+function run_test() {
+local EXIT_CODE=0
+local PARALLELISM=1 # parallelism of queryable state app
+local PORT="9069" # port of queryable state server
+
+clean_stdout_files # to ensure there are no files accidentally left 
behind by previous tests
+link_queryable_state_lib
+start_ha_cluster
+
+local JOB_ID=$(${FLINK_DIR}/bin/flink run \
+-p ${PARALLELISM} \
+-d ${QUERYABLE_STATE_SERVER_JAR} \
+--state-backend "rocksdb" \
+--tmp-dir file://${TEST_DATA_DIR} \
+| awk '{print $NF}' | tail -n 1)
+
+wait_job_running ${JOB_ID}
+
+sleep 20 # sleep a little to have some state accumulated
+
+SERVER=$(get_queryable_state_server_ip)
+PORT=$(get_queryable_state_proxy_port)
+
+echo SERVER: ${SERVER}
+echo PORT: ${PORT}
+
+java -jar ${QUERYABLE_STATE_CLIENT_JAR} \
+--host ${SERVER} \
+--port ${PORT} \
+--iterations 1 \
+--job-id ${JOB_ID}
+
+if [ $? != 0 ]; then
+echo "An error occurred when executing queryable state client"
+exit 1
+fi
+
+kill_random_taskmanager
+
+latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" 
| tail -n 1 | awk '{print $4}')
+echo "Latest snapshot count was ${latest_snapshot_count}"
+
+sleep 10 # this is a little longer than the heartbeat timeout so that 
the TM is gone
+
+start_and_wait_for_tm
+
+wait_job_running ${JOB_ID}
+
--- End diff --

Instead of just waiting for the job to be running, it is safer to ask 
through `REST` for the successful checkpoints for the job right after killing 
the TM, and then expecting to see more successful checkpoints after the new TM 
is up. This is safer because it guarantees that the backend is initialized 
properly and can be done similarly to how it is done in the case of the 
`test_ha.sh`.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187676635
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -0,0 +1,120 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+

+QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar

+QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+#
+# Test that queryable state works as expected with HA mode when restarting 
a taskmanager
+#
+# The general outline is like this:
+# 1. start cluster in HA mode with 1 TM
+# 2. start a job that exposes queryable state from a mapstate with 
increasing num. of keys
+# 3. query the state with a queryable state client and expect no error to 
occur
+# 4. stop the TM
+# 5. check how many keys were in our mapstate at the time of the latest 
snapshot
+# 6. start a new TM
+# 7. query the state with a queryable state client and retrieve the number 
of elements
+#in the mapstate
+# 8. expect the number of elements in the mapstate after restart of TM to 
be > number of elements
+#at last snapshot
+#
+# Globals:
+#   QUERYABLE_STATE_SERVER_JAR
+#   QUERYABLE_STATE_CLIENT_JAR
+# Arguments:
+#   None
+# Returns:
+#   None
+#
+function run_test() {
+local EXIT_CODE=0
+local PARALLELISM=1 # parallelism of queryable state app
+local PORT="9069" # port of queryable state server
+
+clean_stdout_files # to ensure there are no files accidentally left 
behind by previous tests
+link_queryable_state_lib
+start_ha_cluster
+
+local JOB_ID=$(${FLINK_DIR}/bin/flink run \
+-p ${PARALLELISM} \
+-d ${QUERYABLE_STATE_SERVER_JAR} \
+--state-backend "rocksdb" \
+--tmp-dir file://${TEST_DATA_DIR} \
+| awk '{print $NF}' | tail -n 1)
+
+wait_job_running ${JOB_ID}
+
+sleep 20 # sleep a little to have some state accumulated
--- End diff --

Instead of waiting, it is safer to ask through `REST` for the successful 
checkpoints for the job right after killing the TM, and then expecting to see 
more successful checkpoints after the new TM is up. This is safer because it 
guarantees that the backend is initialized properly and can be done similarly 
to how it is done in the case of the `test_ha.sh`.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187675826
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh ---
@@ -0,0 +1,120 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+

+QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar

+QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+#
+# Test that queryable state works as expected with HA mode when restarting 
a taskmanager
+#
+# The general outline is like this:
+# 1. start cluster in HA mode with 1 TM
+# 2. start a job that exposes queryable state from a mapstate with 
increasing num. of keys
+# 3. query the state with a queryable state client and expect no error to 
occur
+# 4. stop the TM
+# 5. check how many keys were in our mapstate at the time of the latest 
snapshot
+# 6. start a new TM
+# 7. query the state with a queryable state client and retrieve the number 
of elements
+#in the mapstate
+# 8. expect the number of elements in the mapstate after restart of TM to 
be > number of elements
+#at last snapshot
+#
+# Globals:
+#   QUERYABLE_STATE_SERVER_JAR
+#   QUERYABLE_STATE_CLIENT_JAR
+# Arguments:
+#   None
+# Returns:
+#   None
+#
+function run_test() {
+local EXIT_CODE=0
+local PARALLELISM=1 # parallelism of queryable state app
+local PORT="9069" # port of queryable state server
+
+clean_stdout_files # to ensure there are no files accidentally left 
behind by previous tests
+link_queryable_state_lib
+start_ha_cluster
--- End diff --

We do not need an HA cluster. A normal cluster would be enough.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187675012
  
--- Diff: flink-end-to-end-tests/flink-queryable-state-test/pom.xml ---
@@ -0,0 +1,134 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   
flink-queryable-state-test_${scala.binary.version}
+   flink-queryable-state-test
+
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-statebackend-rocksdb_2.11
+   1.6-SNAPSHOT
--- End diff --

the version should be `${project.version}`


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-05-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r187674099
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Streaming application that creates an {@link Email} pojo with random 
ids and increasing
+ * timestamps and passes it to a stateful {@link 
org.apache.flink.api.common.functions.FlatMapFunction},
+ * where it is exposed as queryable state.
+ */
+public class QsStateProducer {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   ParameterTool tool = ParameterTool.fromArgs(args);
+   String tmpPath = tool.getRequired("tmp-dir");
+   String stateBackendType = tool.getRequired("state-backend");
+
+   StateBackend stateBackend;
+   switch (stateBackendType) {
+   case "rocksdb":
+   stateBackend = new RocksDBStateBackend(tmpPath);
+   break;
+   case "fs":
+   stateBackend = new FsStateBackend(tmpPath);
+   break;
+   case "memory":
+   stateBackend = new MemoryStateBackend();
+   break;
+   default:
+   throw new RuntimeException("Unsupported state 
backend " + stateBackendType);
+   }
+
+   env.setStateBackend(stateBackend);
+   env.enableCheckpointing(1000L);
+   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
+
+   env.addSource(new EmailSource())
+   .keyBy(new KeySelector() {
+
+   private static final long serialVersionUID = 
-1480525724620425363L;
+
+   @Override
+   public String getKey(Email value) throws 
Exception {
+   return ""

[GitHub] flink issue #5993: [FLINK-9336][state] fix deserialization problem for query...

2018-05-11 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5993
  
Thanks for the work @sihuazhou and for reporting this @florianschmidt1994 ! 
This fixes the problem described in the JIRA but I will look at the PR a bit 
more thoroughly on Monday at the latest and then merge it if everything is ok.


---


[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-08 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5955
  
Could you review it @aljoscha ?


---


[GitHub] flink issue #5495: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-04 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5495
  
I close this and I will open an updated one.


---


[GitHub] flink pull request #5495: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-04 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/5495


---


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-04 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5955

[FLINK-8659] Add migration itcases for broadcast state.

As the name implies, this PR add migration tests for the newly introduced 
broadcast state.

For the `scala` case, more refactoring is required so that the shared code 
between the tests is better distributed, but this is a broader refactoring. It 
requires the same work that was done for the previous case of the `java` 
migration tests.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink migration-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5955


commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e
Author: kkloudas 
Date:   2018-05-03T08:05:13Z

[FLINK-8659] Add migration itcases for broadcast state.




---


[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...

2018-05-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5860#discussion_r185731699
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -908,6 +929,20 @@ private void 
handlePendingFilesForPreviousCheckpoints(Map> pe
return this;
}
 
+   /**
+* Sets the roll over interval in milliseconds.
+*
+*
+* When a bucket part file is older than the roll over interval, a 
new bucket part file is
+* started and the old one is closed. The name of the bucket file 
depends on the {@link Bucketer}.
+*
+* @param batchRolloverInterval The roll over interval in milliseconds
+*/
+   public BucketingSink setBatchRolloverInterval(long 
batchRolloverInterval) {
+   this.batchRolloverInterval = batchRolloverInterval;
+   return this;
--- End diff --

Please add checks for invalid configs like negative values.


---


[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.

2018-05-02 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5922
  
Thanks a lot for the reviews! I pushed a commit that integrates your 
comments. Let me know if now it looks ok.


---


[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.

2018-05-02 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5922
  
This seems like a nice suggestion.
I will do that and let’s see how it reads afterwards.

> On May 2, 2018, at 12:21 PM, Tzu-Li Tai  wrote:
> 
> @tzulitai commented on this pull request.
> 
> In docs/dev/stream/state/broadcast_state.md 
<https://github.com/apache/flink/pull/5922#discussion_r185452174>:
> 
> > +
> +* ToC
> +{:toc}
> +
> +[Working with State](state.html) described operator state which is 
either **evenly** distributed among the parallel
> +tasks of an operator, or state which **upon restore**, its partial 
(task) states are **unioned** and the whole state is 
> +used to initialize the restored parallel tasks.
> +
> +A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
> +where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
> +and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
> +natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
> +elements coming from another stream. Having the above type of use-cases 
in mind, broadcast state differs from the rest 
> +of operator states in that:
> + 1. it has a map format,
> + 2. it is only available to streams whose elements are *broadcasted*,
> How about,
> "it is only available to operators which have a broadcasted input stream"?
> 
> This might only be a matter of personal preference, so please take this 
as a grain of salt.
> It's just that I somehow find it easier to understand when thinking in 
terms of states and operators.
> 
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub 
<https://github.com/apache/flink/pull/5922#discussion_r185452174>, or mute the 
thread 
<https://github.com/notifications/unsubscribe-auth/ACS1qBiVh_nUWs6pz4XUSVk0w1mxN78Rks5tuYisgaJpZM4TlZsp>.
> 




---


[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.

2018-05-02 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5922
  
Thanks a lot for the comments @alpinegizmo and @tzulitai ! I integrated 
most of them and I am not sure how to integrate your comment @tzulitai . Do you 
have any proposal on how this can be made clearer?


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-02 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r185449653
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,281 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
+tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
+used to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to streams whose elements are *broadcasted*,
--- End diff --

Here I was only trying to point out that in order to have broadcast state, 
the elements of the  stream should be broadcasted, *i.e.* 
`broadcast(stateDesc)`. I was not referring to the access rights. This is 
contrary to other states that any stream can have them *e.g.* through the 
runtime context.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184677692
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184678745
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184678478
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184676876
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184675777
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
--- End diff --

I would remove the "This command starts the submission service and CLI 
embedded in one application process.". Because if not, you have to explain: 
What are these processes? What is the difference between submission service and 
CLI? ...


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184678308
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184676407
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184673962
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
--- End diff --

"without a single line of **Java or Scala** code." - because you still 
write sql.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184675917
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
--- End diff --

Put the names of the two modes in **bold** so that the reader know what to 
expect.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184677353
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184676633
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184676032
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
--- End diff --

I do not get the visualization of the CHANGELOG mode. Maybe a small example 
could help.


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184678944
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
+{% highlight bash %}
+./bin/sql-client.sh embedded
+{% endhighlight %}
+
+This command starts the submission service and CLI embedded in one 
application process. By default, the SQL Client will read its configuration 
from the environment file located in `./conf/sql-client-defaults.yaml`. See the 
[next part](sqlClient.html#environment-files) for more information about the 
structure of environment files.
+
+### Running SQL Queries
+
+Once the CLI has been started, you can use the `HELP` command to list all 
available SQL statements. For validating your setup and cluster connection, you 
can enter your first SQL query and press the `Enter` key to execute it:
+
+{% highlight sql %}
+SELECT 'Hello World'
+{% endhighlight %}
+
+This query requires no table source and produces a single row result. The 
CLI will retrieve results from the cluster and visualize them. You can close 
the result view by pressing the `Q` key.
+
+The CLI supports **two modes** for maintaining and visualizing results.
+
+The *table mode* materializes results in memory and visualizes them in a 
regular, paginated table representation. It can be enabled by executing the 
following command in the CLI:
+
+{% highlight text %}
+SET execution.result-mode=table
+{% endhighlight %}
+
+The *changelog mode* does not materialize results and visualizes the 
result stream that is produced by a continuous query [LINK] consisting of 
insertions (`+`) and retractions (`-`).
+
+{% highlight text %}
+SET execution.result-mode=changelog
+{% endhighlight %}
+
+You can use the following query to see both result modes in action:
+
+{% highlight sql %}
+SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), 
('Bob')) AS NameTable(name) GROUP BY name 
+{% endhighlight %}
+
+This query performs a bounded word count example. The following sections 
explain how to read from table sources and configure other table program 
properties. 
+
+{% top %}
+
+Configuration
+-
+
+The SQL Client can be started with the following optional CLI commands. 
They are discussed in detail in the subsequent paragraphs.
+
+{% highlight text %}
+./bin/sql-client.sh embedded --help
+
+Mode "embedded" submits Flink jobs from the local machine.
+
+  Syntax: embedded [OPTIONS]
+  "embedded" mode options:
+ -d,--defaults   The environment properties with 
whi

[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184675061
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
+This section describes how to setup and run your first Flink SQL program 
from the command-line. The SQL Client is bundled in the regular Flink 
distribution and thus runnable out of the box.
+
+The SQL Client requires a running Flink cluster where table programs can 
be submitted to. For more information about setting up a Flink cluster see the 
[deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
+
+{% highlight bash %}
+./bin/start-cluster.sh
+{% endhighlight %}
+
+### Starting the SQL Client CLI
+
+The SQL Client scripts are also located in the binary directory of Flink. 
You can start the CLI by calling:
+
--- End diff --

What is the `embedded`? What is embedded where?


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184674272
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
--- End diff --

Use either: 
```Attention``` 
or 
```

  Note: 

```


---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184674886
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,538 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
+
+The *SQL Client* aims to provide an easy way of writing, debugging, and 
submitting table programs to a Flink cluster without a single line of code. The 
*SQL Client CLI* allows for retrieving and visualizing real-time results from 
the running distributed application on the command line.
+
+
+
+**Note:** The SQL Client is in an early developement phase. Even though 
the application is not production-ready yet, it can be a quite useful tool for 
prototyping and playing around with Flink SQL. In the future, the community 
plans to extend its functionality by providing a REST-based [SQL Client 
Gateway](sqlClient.html#limitations--future).
+
+* This will be replaced by the TOC
+{:toc}
+
+Getting Started
+---
+
--- End diff --

The following two paragraphs I would restructure along the line:

```
This section describes how to setup and run your first Flink SQL program 
from the command-line. 

The SQL Client is bundled in the regular Flink distribution and thus 
runnable out-of-the-box. It requires only a running Flink cluster where table 
programs can be executed. For more information about setting up a Flink cluster 
see the [deployment part of this documentation]({{ site.baseurl 
}}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL 
Client, you can also start a local cluster with one worker using the following 
command:
```


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-04-26 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5922

[FLINK-8780] [docs] Add Broadcast State documentation.

R @fhueske 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink broadcast-docs-inv2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5922.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5922


commit 678d6904e4560e8dcd6884e0b8f4d5cef61baa62
Author: kkloudas 
Date:   2018-04-26T10:23:58Z

[FLINK-8780] [docs] Add Broadcast State documentation.




---


[GitHub] flink issue #5910: [FLINK-8841] [state] Remove HashMapSerializer and use Map...

2018-04-26 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5910
  
Thanks @yuqi1129, @bowenli86 and @StefanRRichter for the reviews. I 
integrated your comments. If you are done with reviewing, I will merge it as 
soon as Travis gives the green light.


---


[GitHub] flink pull request #5910: [FLINK-8841] [state] Remove HashMapSerializer and ...

2018-04-25 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5910

[FLINK-8841] [state] Remove HashMapSerializer and use MapSerializer instead.

## What is the purpose of the change

So far we had the `MapSerializer` and the `HashMapSerializer`. The two had 
almost identical code and the second was only used on the 
`HeapStateBackend`/`FSStateBackend` when creating a `MapState`. This PR removes 
the `HashMapSerializer` and replaces its uses with the `MapSerializer`. It also 
guarantees backwards compatibility.

## Brief change log

It introduces the `MigrationUtil` as an inner class of the 
`InstantiationUtil`. This class contains mapping between deprecated/deleted 
serializers and their replacements.

Also the removal of the `HashMapSerializer` uniformizes a bit the 
`HeapMapState` and the `RocksDBMapState`.

## Verifying this change

Added the 
`HeapKeyedStateBackendSnapshotMigrationTest#testMapStateMigrationAfterHashMapSerRemoval()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: yes
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable

R @StefanRRichter or @aljoscha 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink map-serializer-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5910


commit 7de83de4765384080cea6d94b64a81e1584ce82e
Author: kkloudas 
Date:   2018-04-24T12:48:34Z

[FLINK-8841] Remove HashMapSerializer and use MapSerializer instead.




---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-04-24 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r183745456
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector 
keySelector)  {
public  WithWindow 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector keySelector1;
+   private final KeySelector keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector keySelector1,
+   KeySelector keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
+   }
+
+   /**
+* Configure whether the upper bound should be considered 
exclusive or inclusive.
+*/
+   public TimeBounded upperBoundExclusive(boolean 
exclusive) {

[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test

2018-04-19 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5813
  
Sounds good @twalthr !


---


[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test

2018-04-16 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5813
  
Thanks for the work @twalthr ! The test looks good but it fails 
occasionally due to https://issues.apache.org/jira/browse/FLINK-9113. Given 
that the test is unstable, I would suggest to not merge it for now, as it will 
lead to unstable builds.


---


[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...

2018-04-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5830#discussion_r180541104
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 ---
@@ -89,11 +89,11 @@
 * query the current processing/event time, and also query and update 
the internal
 * {@link org.apache.flink.api.common.state.BroadcastState broadcast 
state}. In addition, it
 * can register a {@link KeyedStateFunction function} to be applied to 
all keyed states on
-* the local partition. These can be done through the provided {@link 
Context}.
+* the local partition. These can be done through the provided {@link 
BaseBroadcastProcessFunction.Context}.
--- End diff --

Remove the `BaseBroadcastProcessFunction`.


---


[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...

2018-04-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5830#discussion_r180538022
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -33,6 +33,8 @@
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.Context;
--- End diff --

Unused import.


---


[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...

2018-04-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5830#discussion_r180541190
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 ---
@@ -89,11 +89,11 @@
 * query the current processing/event time, and also query and update 
the internal
 * {@link org.apache.flink.api.common.state.BroadcastState broadcast 
state}. In addition, it
 * can register a {@link KeyedStateFunction function} to be applied to 
all keyed states on
-* the local partition. These can be done through the provided {@link 
Context}.
+* the local partition. These can be done through the provided {@link 
BaseBroadcastProcessFunction.Context}.
 * The context is only valid during the invocation of this method, do 
not store it.
 *
 * @param value The stream element.
-* @param ctx A {@link Context} that allows querying the timestamp of 
the element,
+* @param ctx A {@link BaseBroadcastProcessFunction.Context} that 
allows querying the timestamp of the element,
--- End diff --

Remove the `BaseBroadcastProcessFunction`.


---


[GitHub] flink issue #5811: [FLINK-9113] [connectors] Fix flushing behavior of bucket...

2018-04-06 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5811
  
Well it seems like for these tests, the `flush` is not actually flushing. 
The files are there, the `validPartLength` is correct (=6 as we just write 
`test1\n`) but the data is not actually on disk. If you call `close()` on the 
in-progress file when snapshotting, then the tests succeed and the data is 
there.

I would recommend to just remove the check for now, and open a followup 
JIRA that contains the check that you will remove, and also points on the 
discussion about HDFS not flushing, and we see how to proceed.

I thing that the fact that the end-to-end tests pass point to the direction 
that sth is wrong with the FS abstraction.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827723
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
+// return timestamp;
+// }
+
+   public List getStuff() {
+   return stuff;
+   }
+
+   public Long getAsdf() {
+   return asdf;
+   }
+
+   public LabelSurrogate getLabel() {
+   return label;
+   }
+
+   public void setLabel(LabelSurrogate label) {
+   this.label = label;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   EmailInformation that = (EmailInformation) o;
+   return Objects.equals(emailId, that.emailId) &&
+// Objects.equals(timestamp, that.timestamp) &&
--- End diff --

remove.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828967
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

The dir to checkpoint can be a parameter and here it should be a path in 
the `TEST_DIR` of the test itself. In addition, everything should be explicitly 
cleaned up, e.g. checkpoints, potential output/input data, etc.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178828372
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 EXIT_CODE=0
 
--- End diff --

I would recommend to move it to  the nightly tests. Queryable state is not 
a core component and the normal builds are already timing out.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827995
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
+public class LabelSurrogate {
+
+   private Type type;
+   private String foo;
+
+   public LabelSurrogate(Type type, String foo) {
+   this.type = type;
+   this.foo = foo;
+   }
+
+   public Type getType() {
+   return type;
+   }
+
+   public void setType(Type type) {
+   this.type = type;
+   }
+
+   public String getFoo() {
+   return foo;
+   }
+
+   public void setFoo(String foo) {
+   this.foo = foo;
+   }
+
+   @Override
+   public String toString() {
+   return "LabelSurrogate{" +
+   "type=" + type +
+   ", foo='" + foo + '\'' +
+   '}';
+   }
+
+   /**
+* Javadoc.
+*/
--- End diff --

Same here.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827936
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * Javadoc.
+ */
--- End diff --

This is just a placeholder comment for checkstyle verification to pass. 
Please write a real comment. 
This holds also for other places.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178829164
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Javadoc.
+ */
+public class QsBugPoc {
+
+   public static final String QUERYABLE_STATE_NAME = "state";
+   public static final String STATE_NAME = "state";
+
+   public static void main(final String[] args) throws Exception {
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   RocksDBStateBackend rocksDb = new 
RocksDBStateBackend("file:///tmp/deleteme-rocksdb");
+   env.setStateBackend(rocksDb);
--- End diff --

Also check for different backends, i.e. file and rocks. You can have a look 
to the `test_ha.sh`.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827415
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
--- End diff --

Remove commented methods.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827698
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
--- End diff --

remove.


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827543
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
--- End diff --

same here (remove commented field).


---


[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

2018-04-03 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5807#discussion_r178827678
  
--- Diff: 
flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Javadoc.
+ */
+public class EmailInformation implements Serializable {
+
+   private static final long serialVersionUID = -8956979869800484909L;
+
+   public void setEmailId(EmailId emailId) {
+   this.emailId = emailId;
+   }
+
+   private EmailId emailId;
+
+// public void setTimestamp(Instant timestamp) {
+// this.timestamp = timestamp;
+// }
+
+   //private Instant timestamp;
+
+   public void setStuff(List stuff) {
+   this.stuff = stuff;
+   }
+
+   private List stuff;
+
+   public void setAsdf(Long asdf) {
+   this.asdf = asdf;
+   }
+
+   private Long asdf = 0L;
+
+   private transient LabelSurrogate label;
+
+   public EmailInformation() {
+
+   }
+
+   public EmailInformation(Email email) {
+   emailId = email.getEmailId();
+   //  timestamp = email.getTimestamp();
+   stuff = new ArrayList<>();
+   stuff.add("1");
+   stuff.add("2");
+   stuff.add("3");
+   label = email.getLabel();
+   }
+
+   public EmailId getEmailId() {
+   return emailId;
+   }
+
+// //public Instant getTimestamp() {
--- End diff --

remove.


---


[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...

2018-04-03 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5750
  
Thanks a lot @twalthr for the review!  


---


[GitHub] flink pull request #5756: [FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX ...

2018-03-29 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5756#discussion_r178030866
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -93,6 +93,10 @@ private MiniClusterResource(
this.enableClusterClient = enableClusterClient;
}
 
+   public MiniClusterType getMiniClusterType() {
--- End diff --

This is definitely not a big issue. I will merge. 


---


[GitHub] flink pull request #5756: [FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX ...

2018-03-28 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5756#discussion_r15838
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 ---
@@ -93,6 +93,10 @@ private MiniClusterResource(
this.enableClusterClient = enableClusterClient;
}
 
+   public MiniClusterType getMiniClusterType() {
--- End diff --

Why not renaming it to sth like `isLegacyDeployment()` and return a 
`boolean` this will also allow to make the `MiniClusterType` a private class 
which is also better pattern.


---


[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...

2018-03-27 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5750
  
Hi @walterddr, I addressed most of your comments. Feel free to have another 
look.



---


[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r177436350
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,9 +162,42 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r177364376
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,109 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
+
+# first revert the conf/masters file
+echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+# and then the conf/flink-conf.yaml
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 1
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function create_ha_config() {
+
+# create the masters file (only one currently).
+# This must have all the masters to be used in HA.
+echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+# clean up the dir that will be used for zookeeper storage
+# (see high-availability.zookeeper.storageDir below)
+if [ -e $TEST_DATA_DIR/recovery ]; then
+   echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+   rm -rf $TEST_DATA_DIR/recovery
+fi
+
+# then move on to create the flink-conf.yaml
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 4
+parallelism.default: 1
+
+
#==
+# High Availability
+
#==
+
+high-availability: zookeeper
+high-availability.zookeeper.storageDir: 
file://${TEST_DATA_DIR}/recovery/
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /test_cluster_one
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+echo "Setting up HA Cluster..."
+create_ha_config
+start_local_zk
+start_cluster
+}
+
+function start_local_zk {
+# Parses the zoo.cfg and starts locally zk.
+
+# This is almost the same code as the
+# /bin/start-zookeeper-quorum.sh without the SSH part and only running 
for localhost.
--- End diff --

The problem is that the `zookeeper` script does not take into account that 
the fact that we may want to launch `zookeeper` locally, so it always asks for 
`SSH` credentials.


---


[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...

2018-03-27 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5751
  
Hi @sihuazhou ! I think that for now materializing the list of keys and 
then passing it to the `process` is the best solution. But keep in mind that 
this is only for the `HeapKeyedStateBackend` and not for RocksDB so the method 
should create the copy only there. 


---


  1   2   3   4   5   6   7   8   9   10   >