[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r202574665 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and
[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6281 Hi @aljoscha and @StephanEwen . I have updated the PR, please have a look. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201342618 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201059374 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); + } + } + + this.pending = new ArrayList<>(); + this.pendingPerCheckpoint = new HashMap<>(); + } + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + lo
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201059444 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java --- @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A bucket is the directory organization of the output of the {@link BucketingSink}. + * + * + * For each incoming element in the {@code BucketingSink}, the user-specified + * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is + * queried to see in which bucket this element should be written to. + */ +public class Bucket { + + private static final String PART_PREFIX = "part"; + + private final Path bucketPath; + + private int subtaskIndex; + + private long partCounter; + + private long creationTime; + + private long lastWrittenTime; + + private final long maxPathSize; + + private final long rolloverTime; + + private final long inactivityTime; + + private final Writer outputFormatWriter; + + private final ResumableWriter fsWriter; + + private RecoverableFsDataOutputStream currentOpenPartStream; + + private List pending = new ArrayList<>(); + + private Map> pendingPerCheckpoint = new HashMap<>(); + + public Bucket( + ResumableWriter fsWriter, + int subtaskIndex, + Path bucketPath, + long initialPartCounter, + long maxPartSize, + long rolloverTime, + long inactivityTime, + Writer writer, + BucketState bucketstate) throws IOException { + + this(fsWriter, subtaskIndex, bucketPath, initialPartCounter, maxPartSize, rolloverTime, inactivityTime, writer); + + // the constructor must have already initialized the filesystem writer + Preconditions.checkState(fsWriter != null); + + // we try to resume the previous in-progress file, if the filesystem + // supports such operation. If not, we just commit the file and start fresh. + + final ResumableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress(); + if (resumable != null) { + this.currentOpenPartStream = fsWriter.recover(resumable); + this.creationTime = bucketstate.getCreationTime(); + } + + // we commit pending files for previous checkpoints to the last successful one + // (from which we are recovering from) + for (List commitables: bucketstate.getPendingPerCheckpoint().values()) { + for (ResumableWriter.CommitRecoverable commitable: commitables) { + fsWriter.recoverForCommit(commitable).commit(); --- End diff -- You are right! ---
[GitHub] flink issue #6176: [FLINK-9603][connector-filesystem] fix part indexing, whe...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6176 Hi @kent2171 ! Do not worry, I can fix it ;) Thanks for having a look! ---
[GitHub] flink issue #6176: [FLINK-9603][connector-filesystem] fix part indexing, whe...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6176 Thanks for the work @kent2171 ! I will merge this as soon as Travis gives the green light! ---
[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6281 Hi @xndai ! I will update an outdated design doc and will attach it to the JIRA! I will ping you here to have a look. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r200909011 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/Writer.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Javadoc. --- End diff -- Thanks for the catch! I will update. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/6281 [FLINK-9750] Add new StreamingFileSink with ResumableWriter. ## What is the purpose of the change This PR is the first step towards introducing a new Streaming Filesystem sink that works on top of Flink's filesystem abstraction and provides support for both row and block-based formats (like ORC/Parquet). The current version only supports the LocalFileSystem. ## Brief change log The first commit introduces the new `ResumableWriter` abstraction and an implementation for the `LocalFileSystem`, while the second introduces the new `StreamingFileSink`. ## Verifying this change This changes add tests in the `LocalStreamingFileSinkTest` and the `BucketStateSerializerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented yet **NOTE TO REVIEWER**: Still logging is missing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink bucketing-local-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6281 commit 64cdd7b57f6f71db3fa8fe90d9eb37e784b9ae56 Author: Stephan Ewen Date: 2018-06-29T17:27:58Z [hotfix] [core] Remove unused class AbstractMultiFSDataInputStream commit 2a7ed070dd7b57a58a7588b7ce03c4032a3283fc Author: Stephan Ewen Date: 2018-06-29T17:15:54Z [FLINK-9751][filesystem] Add PersistentResumableWriter interface. commit da64fedced17cef7d53448dac360a26ae9d32204 Author: kkloudas Date: 2018-07-06T14:38:08Z [FLINK-9750] Add new StreamingFileSink on top of the ResumableWriter. This commit introduces the new sink and tests it on the LocalFileSystem. ---
[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5342 Thanks for the work @florianschmidt1994 ! Merging this. ---
[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198474417 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -330,77 +328,85 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta } } - discardComputationStatesAccordingToStrategy( - sharedBuffer, computationStates, result, afterMatchSkipStrategy); + if (!potentialMatches.isEmpty()) { + nfaState.setStateChanged(); + } + + List>> result = new ArrayList<>(); + if (afterMatchSkipStrategy.isSkipStrategy()) { + processMatchesAccordingToSkipStrategy(sharedBuffer, + nfaState, + afterMatchSkipStrategy, + potentialMatches, + result); + } else { + for (ComputationState match : potentialMatches) { + result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(), --- End diff -- Instead of accessing the state for every match, why not passing all the matches to the shared buffer, and try to fetch the common ones only once. If 2 matches A and B share event with id = 2, we fetch from state only once. ---
[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198473426 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.aftermatch; + +import org.apache.flink.cep.nfa.ComputationState; +import org.apache.flink.cep.nfa.sharedbuffer.EventId; +import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; + + +/** + * Indicate the skip strategy after a match process. + */ +public abstract class AfterMatchSkipStrategy implements Serializable { + + private static final long serialVersionUID = -4048930333619068531L; + + /** +* Discards every partial match that contains event of the match preceding the first of *PatternName*. +* +* @param patternName the pattern name to skip to +* @return the created AfterMatchSkipStrategy +*/ + public static AfterMatchSkipStrategy skipToFirst(String patternName) { + return new SkipToFirstStrategy(patternName); + } + + /** +* Discards every partial match that contains event of the match preceding the last of *PatternName*. +* +* @param patternName the pattern name to skip to +* @return the created AfterMatchSkipStrategy +*/ + public static AfterMatchSkipStrategy skipToLast(String patternName) { + return new SkipToLastStrategy(patternName); + } + + /** +* Discards every partial match that contains event of the match. +* +* @return the created AfterMatchSkipStrategy +*/ + public static AfterMatchSkipStrategy skipPastLastEvent() { + return SkipPastLastStrategy.INSTANCE; + } + + /** +* Every possible match will be emitted. +* +* @return the created AfterMatchSkipStrategy +*/ + public static AfterMatchSkipStrategy noSkip() { + return NoSkipStrategy.INSTANCE; + } + + /** +* Tells if the strategy may skip some matches. +* +* @return false if the strategy is NO_SKIP strategy +*/ + public abstract boolean isSkipStrategy(); + + /** +* Prunes matches/partial matches based on the chosen strategy. +* +* @param partialMatches current partial matches +* @param matchedResult already completed matches +* @param sharedBuffer corresponding shared buffer +* @throws Exception thrown if could not access the state +*/ + public void prune( + Collection partialMatches, --- End diff -- The name `partialMatches` is misleading because we use it also with the `completedMatches`. ---
[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198472975 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java --- @@ -79,18 +98,18 @@ public boolean equals(Object o) { return false; } NFAState nfaState = (NFAState) o; - return Objects.equals(computationStates, nfaState.computationStates); + return Objects.equals(partialMatches, nfaState.partialMatches); } @Override public int hashCode() { - return Objects.hash(computationStates, stateChanged); + return Objects.hash(partialMatches, stateChanged); --- End diff -- Same as above. ---
[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198472927 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java --- @@ -79,18 +98,18 @@ public boolean equals(Object o) { return false; } NFAState nfaState = (NFAState) o; - return Objects.equals(computationStates, nfaState.computationStates); + return Objects.equals(partialMatches, nfaState.partialMatches); --- End diff -- What about the `completedMatches`? ---
[GitHub] flink pull request #6171: [FLINK-9593] Unified After Match semantics with SQ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6171#discussion_r198473858 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -330,77 +328,85 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta } } - discardComputationStatesAccordingToStrategy( - sharedBuffer, computationStates, result, afterMatchSkipStrategy); + if (!potentialMatches.isEmpty()) { + nfaState.setStateChanged(); + } + + List>> result = new ArrayList<>(); + if (afterMatchSkipStrategy.isSkipStrategy()) { + processMatchesAccordingToSkipStrategy(sharedBuffer, + nfaState, + afterMatchSkipStrategy, + potentialMatches, + result); + } else { + for (ComputationState match : potentialMatches) { + result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(), + match.getVersion()).get(0))); + sharedBuffer.releaseNode(match.getPreviousBufferEntry()); + } + } return result; } - private void discardComputationStatesAccordingToStrategy( - final SharedBuffer sharedBuffer, - final Queue computationStates, - final Collection>> matchedResult, - final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception { + private void processMatchesAccordingToSkipStrategy( + SharedBuffer sharedBuffer, + NFAState nfaState, + AfterMatchSkipStrategy afterMatchSkipStrategy, + PriorityQueue potentialMatches, + List>> result) throws Exception { - Set discardEvents = new HashSet<>(); - switch(afterMatchSkipStrategy.getStrategy()) { - case SKIP_TO_LAST: - for (Map> resultMap: matchedResult) { - for (Map.Entry> keyMatches : resultMap.entrySet()) { - if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { - discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); - break; - } else { - discardEvents.addAll(keyMatches.getValue()); - } - } - } - break; - case SKIP_TO_FIRST: - for (Map> resultMap: matchedResult) { - for (Map.Entry> keyMatches : resultMap.entrySet()) { - if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { - break; - } else { - discardEvents.addAll(keyMatches.getValue()); - } - } - } - break; - case SKIP_PAST_LAST_EVENT: - for (Map> resultMap: matchedResult) { - for (List eventList: resultMap.values()) { - discardEvents.addAll(eventList); - } - } - break; - } - if (!discardEvents.isEmpty()) { - List discardStates = new ArrayList<>(); - for (ComputationState computationState : computationStates) { - boolean discard = false; - Map> partialMatch = extractCurrentMatches(sharedBuffer, computationState); - for (List list: partialMatch.values()) { - for (T e: list) { - if (discardEvents.contains(e)) { -
[GitHub] flink pull request #6152: [FLINK-9467][metrics][WebUI] Fix watermark display...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6152#discussion_r197087153 --- Diff: docs/monitoring/metrics.md --- @@ -1254,15 +1254,15 @@ Thus, in order to infer the metric identifier: Counter - Operator currentInputWatermark -The last watermark this operator has received (in milliseconds). -Note: For operators with 2 inputs this is the minimum of the last received watermarks. +The last watermark this operator/tasks has received (in milliseconds). +Note: For operators/tasks with 2 inputs this is the minimum of the last received watermarks. Gauge + Operator --- End diff -- I would put "Operator Only" to emphasize the diff with the above. ---
[GitHub] flink pull request #6152: [FLINK-9467][metrics][WebUI] Fix watermark display...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6152#discussion_r197086995 --- Diff: docs/monitoring/metrics.md --- @@ -1228,7 +1228,7 @@ Thus, in order to infer the metric identifier: Meter - Task/Operator + Task/Operator --- End diff -- I would put "Task and Operator" rather than "Task/Operator" ---
[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6130 Hi @bowenli86, Me, @zentol and @aljoscha both seem to have doubts about the utility of the feature. So given this, and to have a clean JIRA and list of PRs we have to work on, I would suggest to close the PR and the related issue. ---
[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6130 Hi @bowenli86 ! Why not having a `flatmap` after the `readFile` and for every incoming element you emit as many copies as you want? Personally, I am not so fond of adding methods to the public APIs for such specific usecases. ---
[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5342 Thanks @florianschmidt1994 . I will, but may be not today. ---
[GitHub] flink issue #5960: [Flink-8725] Separate state from NFA in CEP library
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5960 @dawidwys Also when you merge the other one, could you also close this PR? ---
[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6059 It would be great if @Aitozi could also report any numbers he has, so that we see the benefits of the change. ---
[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6126 I got this, it is just that I am wondering how useful this is, given that the operators/functions are chained. ---
[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6059#discussion_r194473454 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -296,42 +292,31 @@ public void resetNFAChanged() { if (shouldDiscardPath) { // a stop state was reached in this branch. release branch which results in removing previous event from // the buffer - for (final ComputationState state : statesToRetain) { - eventSharedBuffer.release( - NFAStateNameHandler.getOriginalNameFromInternal( - state.getPreviousState().getName()), - state.getEvent(), - state.getTimestamp(), - state.getCounter()); + for (final ComputationState state : statesToRetain) { + sharedBuffer.releaseNode(state.getPreviousBufferEntry()); } } else { computationStates.addAll(statesToRetain); } } - discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy); - - // prune shared buffer based on window length - if (windowTime > 0L) { - long pruningTimestamp = timestamp - windowTime; - - if (pruningTimestamp < timestamp) { - // the check is to guard against underflows + discardComputationStatesAccordingToStrategy( + sharedBuffer, computationStates, result, afterMatchSkipStrategy); - // remove all elements which are expired - // with respect to the window length - if (eventSharedBuffer.prune(pruningTimestamp)) { - nfaChanged = true; - } - } + if (event.getEvent() == null) { --- End diff -- actually I would recommend having a method: `nfa.advanceTime()` or `nfa.clearUpTo(timestamp)` and we call it from the `AbstractKeyedCEPPatternOperator.advanceTime()` after calling `processEvent()`. This way it is clearer what it does and when it is called. What do you think? ---
[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6059#discussion_r194470370 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -296,42 +292,31 @@ public void resetNFAChanged() { if (shouldDiscardPath) { // a stop state was reached in this branch. release branch which results in removing previous event from // the buffer - for (final ComputationState state : statesToRetain) { - eventSharedBuffer.release( - NFAStateNameHandler.getOriginalNameFromInternal( - state.getPreviousState().getName()), - state.getEvent(), - state.getTimestamp(), - state.getCounter()); + for (final ComputationState state : statesToRetain) { + sharedBuffer.releaseNode(state.getPreviousBufferEntry()); } } else { computationStates.addAll(statesToRetain); } } - discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy); - - // prune shared buffer based on window length - if (windowTime > 0L) { - long pruningTimestamp = timestamp - windowTime; - - if (pruningTimestamp < timestamp) { - // the check is to guard against underflows + discardComputationStatesAccordingToStrategy( + sharedBuffer, computationStates, result, afterMatchSkipStrategy); - // remove all elements which are expired - // with respect to the window length - if (eventSharedBuffer.prune(pruningTimestamp)) { - nfaChanged = true; - } - } + if (event.getEvent() == null) { --- End diff -- I see. Then at least we should have a comment there explaining it. ---
[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6059#discussion_r194468194 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -296,42 +292,31 @@ public void resetNFAChanged() { if (shouldDiscardPath) { // a stop state was reached in this branch. release branch which results in removing previous event from // the buffer - for (final ComputationState state : statesToRetain) { - eventSharedBuffer.release( - NFAStateNameHandler.getOriginalNameFromInternal( - state.getPreviousState().getName()), - state.getEvent(), - state.getTimestamp(), - state.getCounter()); + for (final ComputationState state : statesToRetain) { + sharedBuffer.releaseNode(state.getPreviousBufferEntry()); } } else { computationStates.addAll(statesToRetain); } } - discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy); - - // prune shared buffer based on window length - if (windowTime > 0L) { - long pruningTimestamp = timestamp - windowTime; - - if (pruningTimestamp < timestamp) { - // the check is to guard against underflows + discardComputationStatesAccordingToStrategy( + sharedBuffer, computationStates, result, afterMatchSkipStrategy); - // remove all elements which are expired - // with respect to the window length - if (eventSharedBuffer.prune(pruningTimestamp)) { - nfaChanged = true; - } - } + if (event.getEvent() == null) { --- End diff -- Why here `==null`? I may be missing sth. ---
[GitHub] flink issue #6126: [FLINK-9530][metrics] Fix numRecords task metric for chai...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6126 If this is correct, then I suppose we can remove it altogether so that the code is also cleaner, right? ---
[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6108 @zhangxinyu1 as soon as this sink is ready, I believe that the existing File Source will be able to read the output of the Bucketing Sink. As far as bandwidth limitations are concerned, could you elaborate a bit on what you mean? You want to tell the source to read at speed X records/sec? ---
[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6108 Thanks for the useful input here @zhangxinyu1 and @StephanEwen. As soon as I have sth concrete I create the JIRA and post it here. ---
[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6059 Hi @dawidwys! Thanks for the work. The changes seem really good. As we discussed privately I did a review and the comments you can find them in the branch I sent you. The only things I would like to write here as a reminder are: 1) as a future commit, we could in some cases do the cleaning up during discovering the partial matches, i.e. do the `releaseNode()` from within the `extractCurrentMatches()` (whenever appropriate). This could be a separate JIRA actually. 2) when migrating from an old version, we should ignore potential "dead-end" edges (edges that point to nodes that have been removed) in the partial match buffer. ---
[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460728 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = + new KeyedProcessOperator<>(new KeyQueryingProcessFunction()); + + OneInputStreamOperatorTestHarness, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO); + --- End diff -- same here. ---
[GitHub] flink pull request #6097: [FLINK-9470] Allow querying the key in KeyedProces...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6097#discussion_r191460316 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -50,6 +52,30 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testKeyQuerying() throws Exception { + + KeyedProcessOperator, String> operator = --- End diff -- The `OneInputStreamOperatorTestHarness` is `AutoCloseable` so I would recommend to go with ``` try(harness=...) { ... } ``` And remove the explicit call to `harness.close()`. This is a nice practice to start enforcing in new tests as it cleans up any leaks in case of exceptions and stuff. ---
[GitHub] flink issue #6028: [FLINK-9356] Improve error message for when queryable sta...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/6028 LGTM, so +1 and I will merge later. Thanks for the work @yanghua and for the review @florianschmidt1994 . ---
[GitHub] flink pull request #6028: [FLINK-9356] Improve error message for when querya...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/6028#discussion_r189015635 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java --- @@ -225,7 +225,9 @@ private void executeActionAsync( return location; } else { - return FutureUtils.completedExceptionally(new UnknownLocationException("Could not contact the state location oracle to retrieve the state location.")); + return FutureUtils.completedExceptionally( + new UnknownLocationException("Could not contact the state location oracle to retrieve the state location for state=" + + queryableStateName + " of job=" + jobId + ", the caused reason maybe the state is not ready or there is no job exists.")); --- End diff -- This message is pretty verbose and it contains grammatical errors. If it were to write this, it should be something like: ``` Could not contact the state location oracle to retrieve the location for state=QSName of job= JobID. The reason can be that the state is not ready or that that does not exist. ``` But before putting this in, it would be helpful if @florianschmidt1994 commented on what he thinks, as he is the one that opened the issue. ---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5955 Hi @tzulitai ! Thanks for the review. I integrated most of your comments. The only one I left out is the one about merging the checkpointing and the checking. I am not against that. It is just that the way it is now, it is aligned with the `StatefulJobSavepointMigrationITCase`. If it were to make the change, then we should change both and I would prefer to do it in a separate commit. Let me know what do you think about the current changes and if you are ok, I can merge. ---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188614213 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBa
[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5922 Thanks for the review! Merging this. ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r187924456 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream colorPartitionedStream = shapeStream +.keyBy(new KeySelector(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` + as an argument. + + +{% highlight java %} +DataStream output = colorPartitionedStream + .connect(ruleBroadcastStream) + .process( + + // type arguments in our KeyedBroadcas
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r187922328 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,279 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the +parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and + 3. such an operator can have *multiple broadcast states* with different names. + +## Provided APIs + +To show the provided APIs, we will start with an example before presenting their full functionality. As our running +example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs +of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that +the set of interesting patterns evolves over time. + +In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other +stream will contain the `Rules`. + +Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will +make sure that elements of the same color end up on the same physical machine. + +{% highlight java %} +// key the shapes by color +KeyedStream colorPartitionedStream = shapeStream +.keyBy(new KeySelector(){...}); +{% endhighlight %} + +Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks +should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast +the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules +will be stored. + +{% highlight java %} + +// a map descriptor to store the name of the rule (string) and the rule itself. +MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor<>( + "RulesBroadcastState", + BasicTypeInfo.STRING_TYPE_INFO, + TypeInformation.of(new TypeHint() {}) + ); + +// broadcast the rules and create the broadcast state +BroadcastStream ruleBroadcastStream = ruleStream +.broadcast(ruleStateDescriptor); +{% endhighlight %} + +Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to: +1) connect the two streams and +2) specify our match detecting logic. + +Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the +non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on +which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic. +The exact type of the function depends on the type of the non-broadcasted stream: + - if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`. + - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. + + Given that our non-broadcasted stream is keyed, the following snippet includes the above calls: + + + Attention: The connect should be called on the non-broadcasted stream, with the `BroadcastStream` --- End diff -- It is not even available as an option to call `connect()` in this case. The `broadcast(descriptor)` will give you a `BroadcastStream` back that has no available transformations. You can only use it as an argument for a `connect()`. ---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5955 Could you review it @tzulitai ? ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187677168 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Streaming application that creates an {@link Email} pojo with random ids and increasing + * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction}, + * where it is exposed as queryable state. + */ +public class QsStateProducer { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromArgs(args); + String tmpPath = tool.getRequired("tmp-dir"); + String stateBackendType = tool.getRequired("state-backend"); + + StateBackend stateBackend; + switch (stateBackendType) { + case "rocksdb": + stateBackend = new RocksDBStateBackend(tmpPath); + break; + case "fs": + stateBackend = new FsStateBackend(tmpPath); + break; + case "memory": + stateBackend = new MemoryStateBackend(); + break; + default: + throw new RuntimeException("Unsupported state backend " + stateBackendType); + } + + env.setStateBackend(stateBackend); + env.enableCheckpointing(1000L); + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); + + env.addSource(new EmailSource()) + .keyBy(new KeySelector() { + + private static final long serialVersionUID = -1480525724620425363L; + + @Override + public String getKey(Email value) throws Exception { + return ""
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187676881 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -0,0 +1,120 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/queryable_state_base.sh + +QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar +QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar + +# +# Test that queryable state works as expected with HA mode when restarting a taskmanager +# +# The general outline is like this: +# 1. start cluster in HA mode with 1 TM +# 2. start a job that exposes queryable state from a mapstate with increasing num. of keys +# 3. query the state with a queryable state client and expect no error to occur +# 4. stop the TM +# 5. check how many keys were in our mapstate at the time of the latest snapshot +# 6. start a new TM +# 7. query the state with a queryable state client and retrieve the number of elements +#in the mapstate +# 8. expect the number of elements in the mapstate after restart of TM to be > number of elements +#at last snapshot +# +# Globals: +# QUERYABLE_STATE_SERVER_JAR +# QUERYABLE_STATE_CLIENT_JAR +# Arguments: +# None +# Returns: +# None +# +function run_test() { +local EXIT_CODE=0 +local PARALLELISM=1 # parallelism of queryable state app +local PORT="9069" # port of queryable state server + +clean_stdout_files # to ensure there are no files accidentally left behind by previous tests +link_queryable_state_lib +start_ha_cluster + +local JOB_ID=$(${FLINK_DIR}/bin/flink run \ +-p ${PARALLELISM} \ +-d ${QUERYABLE_STATE_SERVER_JAR} \ +--state-backend "rocksdb" \ +--tmp-dir file://${TEST_DATA_DIR} \ +| awk '{print $NF}' | tail -n 1) + +wait_job_running ${JOB_ID} + +sleep 20 # sleep a little to have some state accumulated + +SERVER=$(get_queryable_state_server_ip) +PORT=$(get_queryable_state_proxy_port) + +echo SERVER: ${SERVER} +echo PORT: ${PORT} + +java -jar ${QUERYABLE_STATE_CLIENT_JAR} \ +--host ${SERVER} \ +--port ${PORT} \ +--iterations 1 \ +--job-id ${JOB_ID} + +if [ $? != 0 ]; then +echo "An error occurred when executing queryable state client" +exit 1 +fi + +kill_random_taskmanager + +latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}') +echo "Latest snapshot count was ${latest_snapshot_count}" + +sleep 10 # this is a little longer than the heartbeat timeout so that the TM is gone + +start_and_wait_for_tm + +wait_job_running ${JOB_ID} + --- End diff -- Instead of just waiting for the job to be running, it is safer to ask through `REST` for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the `test_ha.sh`. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187676635 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -0,0 +1,120 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/queryable_state_base.sh + +QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar +QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar + +# +# Test that queryable state works as expected with HA mode when restarting a taskmanager +# +# The general outline is like this: +# 1. start cluster in HA mode with 1 TM +# 2. start a job that exposes queryable state from a mapstate with increasing num. of keys +# 3. query the state with a queryable state client and expect no error to occur +# 4. stop the TM +# 5. check how many keys were in our mapstate at the time of the latest snapshot +# 6. start a new TM +# 7. query the state with a queryable state client and retrieve the number of elements +#in the mapstate +# 8. expect the number of elements in the mapstate after restart of TM to be > number of elements +#at last snapshot +# +# Globals: +# QUERYABLE_STATE_SERVER_JAR +# QUERYABLE_STATE_CLIENT_JAR +# Arguments: +# None +# Returns: +# None +# +function run_test() { +local EXIT_CODE=0 +local PARALLELISM=1 # parallelism of queryable state app +local PORT="9069" # port of queryable state server + +clean_stdout_files # to ensure there are no files accidentally left behind by previous tests +link_queryable_state_lib +start_ha_cluster + +local JOB_ID=$(${FLINK_DIR}/bin/flink run \ +-p ${PARALLELISM} \ +-d ${QUERYABLE_STATE_SERVER_JAR} \ +--state-backend "rocksdb" \ +--tmp-dir file://${TEST_DATA_DIR} \ +| awk '{print $NF}' | tail -n 1) + +wait_job_running ${JOB_ID} + +sleep 20 # sleep a little to have some state accumulated --- End diff -- Instead of waiting, it is safer to ask through `REST` for the successful checkpoints for the job right after killing the TM, and then expecting to see more successful checkpoints after the new TM is up. This is safer because it guarantees that the backend is initialized properly and can be done similarly to how it is done in the case of the `test_ha.sh`. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187675826 --- Diff: flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh --- @@ -0,0 +1,120 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/queryable_state_base.sh + +QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar +QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar + +# +# Test that queryable state works as expected with HA mode when restarting a taskmanager +# +# The general outline is like this: +# 1. start cluster in HA mode with 1 TM +# 2. start a job that exposes queryable state from a mapstate with increasing num. of keys +# 3. query the state with a queryable state client and expect no error to occur +# 4. stop the TM +# 5. check how many keys were in our mapstate at the time of the latest snapshot +# 6. start a new TM +# 7. query the state with a queryable state client and retrieve the number of elements +#in the mapstate +# 8. expect the number of elements in the mapstate after restart of TM to be > number of elements +#at last snapshot +# +# Globals: +# QUERYABLE_STATE_SERVER_JAR +# QUERYABLE_STATE_CLIENT_JAR +# Arguments: +# None +# Returns: +# None +# +function run_test() { +local EXIT_CODE=0 +local PARALLELISM=1 # parallelism of queryable state app +local PORT="9069" # port of queryable state server + +clean_stdout_files # to ensure there are no files accidentally left behind by previous tests +link_queryable_state_lib +start_ha_cluster --- End diff -- We do not need an HA cluster. A normal cluster would be enough. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187675012 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/pom.xml --- @@ -0,0 +1,134 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-queryable-state-test_${scala.binary.version} + flink-queryable-state-test + + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + org.apache.flink + flink-statebackend-rocksdb_2.11 + 1.6-SNAPSHOT --- End diff -- the version should be `${project.version}` ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r187674099 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Streaming application that creates an {@link Email} pojo with random ids and increasing + * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction}, + * where it is exposed as queryable state. + */ +public class QsStateProducer { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ParameterTool tool = ParameterTool.fromArgs(args); + String tmpPath = tool.getRequired("tmp-dir"); + String stateBackendType = tool.getRequired("state-backend"); + + StateBackend stateBackend; + switch (stateBackendType) { + case "rocksdb": + stateBackend = new RocksDBStateBackend(tmpPath); + break; + case "fs": + stateBackend = new FsStateBackend(tmpPath); + break; + case "memory": + stateBackend = new MemoryStateBackend(); + break; + default: + throw new RuntimeException("Unsupported state backend " + stateBackendType); + } + + env.setStateBackend(stateBackend); + env.enableCheckpointing(1000L); + env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); + + env.addSource(new EmailSource()) + .keyBy(new KeySelector() { + + private static final long serialVersionUID = -1480525724620425363L; + + @Override + public String getKey(Email value) throws Exception { + return ""
[GitHub] flink issue #5993: [FLINK-9336][state] fix deserialization problem for query...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5993 Thanks for the work @sihuazhou and for reporting this @florianschmidt1994 ! This fixes the problem described in the JIRA but I will look at the PR a bit more thoroughly on Monday at the latest and then merge it if everything is ok. ---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5955 Could you review it @aljoscha ? ---
[GitHub] flink issue #5495: [FLINK-8659] Add migration itcases for broadcast state.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5495 I close this and I will open an updated one. ---
[GitHub] flink pull request #5495: [FLINK-8659] Add migration itcases for broadcast s...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/5495 ---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5955 [FLINK-8659] Add migration itcases for broadcast state. As the name implies, this PR add migration tests for the newly introduced broadcast state. For the `scala` case, more refactoring is required so that the shared code between the tests is better distributed, but this is a broader refactoring. It requires the same work that was done for the previous case of the `java` migration tests. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink migration-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5955 commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e Author: kkloudas Date: 2018-05-03T08:05:13Z [FLINK-8659] Add migration itcases for broadcast state. ---
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r185731699 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Map> pe return this; } + /** +* Sets the roll over interval in milliseconds. +* +* +* When a bucket part file is older than the roll over interval, a new bucket part file is +* started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}. +* +* @param batchRolloverInterval The roll over interval in milliseconds +*/ + public BucketingSink setBatchRolloverInterval(long batchRolloverInterval) { + this.batchRolloverInterval = batchRolloverInterval; + return this; --- End diff -- Please add checks for invalid configs like negative values. ---
[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5922 Thanks a lot for the reviews! I pushed a commit that integrates your comments. Let me know if now it looks ok. ---
[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5922 This seems like a nice suggestion. I will do that and letâs see how it reads afterwards. > On May 2, 2018, at 12:21 PM, Tzu-Li Tai wrote: > > @tzulitai commented on this pull request. > > In docs/dev/stream/state/broadcast_state.md <https://github.com/apache/flink/pull/5922#discussion_r185452174>: > > > + > +* ToC > +{:toc} > + > +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel > +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is > +used to initialize the restored parallel tasks. > + > +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases > +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally > +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a > +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all > +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest > +of operator states in that: > + 1. it has a map format, > + 2. it is only available to streams whose elements are *broadcasted*, > How about, > "it is only available to operators which have a broadcasted input stream"? > > This might only be a matter of personal preference, so please take this as a grain of salt. > It's just that I somehow find it easier to understand when thinking in terms of states and operators. > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub <https://github.com/apache/flink/pull/5922#discussion_r185452174>, or mute the thread <https://github.com/notifications/unsubscribe-auth/ACS1qBiVh_nUWs6pz4XUSVk0w1mxN78Rks5tuYisgaJpZM4TlZsp>. > ---
[GitHub] flink issue #5922: [FLINK-8780] [docs] Add Broadcast State documentation.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5922 Thanks a lot for the comments @alpinegizmo and @tzulitai ! I integrated most of them and I am not sure how to integrate your comment @tzulitai . Do you have any proposal on how this can be made clearer? ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5922#discussion_r185449653 --- Diff: docs/dev/stream/state/broadcast_state.md --- @@ -0,0 +1,281 @@ +--- +title: "The Broadcast State Pattern" +nav-parent_id: streaming_state +nav-pos: 2 +--- + + +* ToC +{:toc} + +[Working with State](state.html) described operator state which is either **evenly** distributed among the parallel +tasks of an operator, or state which **upon restore**, its partial (task) states are **unioned** and the whole state is +used to initialize the restored parallel tasks. + +A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use-cases +where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally +and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a +natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all +elements coming from another stream. Having the above type of use-cases in mind, broadcast state differs from the rest +of operator states in that: + 1. it has a map format, + 2. it is only available to streams whose elements are *broadcasted*, --- End diff -- Here I was only trying to point out that in order to have broadcast state, the elements of the stream should be broadcasted, *i.e.* `broadcast(stateDesc)`. I was not referring to the access rights. This is contrary to other states that any stream can have them *e.g.* through the runtime context. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184677692 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184678745 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184678478 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184676876 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184675777 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. --- End diff -- I would remove the "This command starts the submission service and CLI embedded in one application process.". Because if not, you have to explain: What are these processes? What is the difference between submission service and CLI? ... ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184678308 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184676407 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184673962 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. --- End diff -- "without a single line of **Java or Scala** code." - because you still write sql. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184675917 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. --- End diff -- Put the names of the two modes in **bold** so that the reader know what to expect. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184677353 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184676633 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184676032 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + --- End diff -- I do not get the visualization of the CHANGELOG mode. Maybe a small example could help. ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184678944 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + +{% highlight bash %} +./bin/sql-client.sh embedded +{% endhighlight %} + +This command starts the submission service and CLI embedded in one application process. By default, the SQL Client will read its configuration from the environment file located in `./conf/sql-client-defaults.yaml`. See the [next part](sqlClient.html#environment-files) for more information about the structure of environment files. + +### Running SQL Queries + +Once the CLI has been started, you can use the `HELP` command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the `Enter` key to execute it: + +{% highlight sql %} +SELECT 'Hello World' +{% endhighlight %} + +This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the `Q` key. + +The CLI supports **two modes** for maintaining and visualizing results. + +The *table mode* materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI: + +{% highlight text %} +SET execution.result-mode=table +{% endhighlight %} + +The *changelog mode* does not materialize results and visualizes the result stream that is produced by a continuous query [LINK] consisting of insertions (`+`) and retractions (`-`). + +{% highlight text %} +SET execution.result-mode=changelog +{% endhighlight %} + +You can use the following query to see both result modes in action: + +{% highlight sql %} +SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name +{% endhighlight %} + +This query performs a bounded word count example. The following sections explain how to read from table sources and configure other table program properties. + +{% top %} + +Configuration +- + +The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs. + +{% highlight text %} +./bin/sql-client.sh embedded --help + +Mode "embedded" submits Flink jobs from the local machine. + + Syntax: embedded [OPTIONS] + "embedded" mode options: + -d,--defaults The environment properties with whi
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184675061 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + +This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out of the box. + +The SQL Client requires a running Flink cluster where table programs can be submitted to. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: + +{% highlight bash %} +./bin/start-cluster.sh +{% endhighlight %} + +### Starting the SQL Client CLI + +The SQL Client scripts are also located in the binary directory of Flink. You can start the CLI by calling: + --- End diff -- What is the `embedded`? What is embedded where? ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184674272 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + --- End diff -- Use either: ```Attention``` or ``` Note: ``` ---
[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5913#discussion_r184674886 --- Diff: docs/dev/table/sqlClient.md --- @@ -0,0 +1,538 @@ +--- +title: "SQL Client" +nav-parent_id: tableapi +nav-pos: 100 +is_beta: true +--- + + + +Although Flinkâs Table & SQL API allows to declare queries in the SQL language. A SQL query needs to be embedded within a table program that is written either in Java or Scala. The table program needs to be packaged with a build tool before it can be submitted to a cluster. This limits the usage of Flink to mostly Java/Scala programmers. + +The *SQL Client* aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of code. The *SQL Client CLI* allows for retrieving and visualizing real-time results from the running distributed application on the command line. + + + +**Note:** The SQL Client is in an early developement phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based [SQL Client Gateway](sqlClient.html#limitations--future). + +* This will be replaced by the TOC +{:toc} + +Getting Started +--- + --- End diff -- The following two paragraphs I would restructure along the line: ``` This section describes how to setup and run your first Flink SQL program from the command-line. The SQL Client is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the [deployment part of this documentation]({{ site.baseurl }}/ops/deployment/cluster_setup.html). If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command: ``` ---
[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5922 [FLINK-8780] [docs] Add Broadcast State documentation. R @fhueske You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink broadcast-docs-inv2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5922.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5922 commit 678d6904e4560e8dcd6884e0b8f4d5cef61baa62 Author: kkloudas Date: 2018-04-26T10:23:58Z [FLINK-8780] [docs] Add Broadcast State documentation. ---
[GitHub] flink issue #5910: [FLINK-8841] [state] Remove HashMapSerializer and use Map...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5910 Thanks @yuqi1129, @bowenli86 and @StefanRRichter for the reviews. I integrated your comments. If you are done with reviewing, I will merge it as soon as Travis gives the green light. ---
[GitHub] flink pull request #5910: [FLINK-8841] [state] Remove HashMapSerializer and ...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5910 [FLINK-8841] [state] Remove HashMapSerializer and use MapSerializer instead. ## What is the purpose of the change So far we had the `MapSerializer` and the `HashMapSerializer`. The two had almost identical code and the second was only used on the `HeapStateBackend`/`FSStateBackend` when creating a `MapState`. This PR removes the `HashMapSerializer` and replaces its uses with the `MapSerializer`. It also guarantees backwards compatibility. ## Brief change log It introduces the `MigrationUtil` as an inner class of the `InstantiationUtil`. This class contains mapping between deprecated/deleted serializers and their replacements. Also the removal of the `HashMapSerializer` uniformizes a bit the `HeapMapState` and the `RocksDBMapState`. ## Verifying this change Added the `HeapKeyedStateBackendSnapshotMigrationTest#testMapStateMigrationAfterHashMapSerRemoval()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable R @StefanRRichter or @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink map-serializer-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5910.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5910 commit 7de83de4765384080cea6d94b64a81e1584ce82e Author: kkloudas Date: 2018-04-24T12:48:34Z [FLINK-8841] Remove HashMapSerializer and use MapSerializer instead. ---
[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5482#discussion_r183745456 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java --- @@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector keySelector) { public WithWindow window(WindowAssigner, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null); } + + /** +* Specifies the time boundaries over which the join operation works, so that +* leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound +* By default both the lower and the upper bound are inclusive. This can be configured +* with {@link TimeBounded#lowerBoundExclusive(boolean)} and +* {@link TimeBounded#upperBoundExclusive(boolean)} +* +* @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound +* @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound +*/ + public TimeBounded between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + input1.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new RuntimeException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + return new TimeBounded<>( + input1, + input2, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true, + keySelector1, + keySelector2 + ); + } + } + } + + /** +* Joined streams that have keys for both sides as well as the time boundaries over which +* elements should be joined defined. +* +* @param Input type of elements from the first stream +* @param Input type of elements from the second stream +* @param The type of the key +*/ + public static class TimeBounded { + + private static final String TIMEBOUNDED_JOIN_FUNC_NAME = "TimeBoundedJoin"; + + private final DataStream left; + private final DataStream right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector keySelector1; + private final KeySelector keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public TimeBounded( + DataStream left, + DataStream right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + KeySelector keySelector1, + KeySelector keySelector2) { + + this.left = Preconditions.checkNotNull(left); + this.right = Preconditions.checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = Preconditions.checkNotNull(keySelector1); + this.keySelector2 = Preconditions.checkNotNull(keySelector2); + } + + /** +* Configure whether the upper bound should be considered exclusive or inclusive. +*/ + public TimeBounded upperBoundExclusive(boolean exclusive) {
[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5813 Sounds good @twalthr ! ---
[GitHub] flink issue #5813: [FLINK-8980] [e2e] Add a BucketingSink end-to-end test
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5813 Thanks for the work @twalthr ! The test looks good but it fails occasionally due to https://issues.apache.org/jira/browse/FLINK-9113. Given that the test is unstable, I would suggest to not merge it for now, as it will lead to unstable builds. ---
[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5830#discussion_r180541104 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java --- @@ -89,11 +89,11 @@ * query the current processing/event time, and also query and update the internal * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. In addition, it * can register a {@link KeyedStateFunction function} to be applied to all keyed states on -* the local partition. These can be done through the provided {@link Context}. +* the local partition. These can be done through the provided {@link BaseBroadcastProcessFunction.Context}. --- End diff -- Remove the `BaseBroadcastProcessFunction`. ---
[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5830#discussion_r180538022 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java --- @@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.Context; --- End diff -- Unused import. ---
[GitHub] flink pull request #5830: [FLINK-9152] Harmonize BroadcastProcessFunction Co...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5830#discussion_r180541190 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java --- @@ -89,11 +89,11 @@ * query the current processing/event time, and also query and update the internal * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. In addition, it * can register a {@link KeyedStateFunction function} to be applied to all keyed states on -* the local partition. These can be done through the provided {@link Context}. +* the local partition. These can be done through the provided {@link BaseBroadcastProcessFunction.Context}. * The context is only valid during the invocation of this method, do not store it. * * @param value The stream element. -* @param ctx A {@link Context} that allows querying the timestamp of the element, +* @param ctx A {@link BaseBroadcastProcessFunction.Context} that allows querying the timestamp of the element, --- End diff -- Remove the `BaseBroadcastProcessFunction`. ---
[GitHub] flink issue #5811: [FLINK-9113] [connectors] Fix flushing behavior of bucket...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5811 Well it seems like for these tests, the `flush` is not actually flushing. The files are there, the `validPartLength` is correct (=6 as we just write `test1\n`) but the data is not actually on disk. If you call `close()` on the in-progress file when snapshotting, then the tests succeed and the data is there. I would recommend to just remove the check for now, and open a followup JIRA that contains the check that you will remove, and also points on the discussion about HDFS not flushing, and we see how to proceed. I thing that the fact that the end-to-end tests pass point to the direction that sth is wrong with the FS abstraction. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827723 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { +// return timestamp; +// } + + public List getStuff() { + return stuff; + } + + public Long getAsdf() { + return asdf; + } + + public LabelSurrogate getLabel() { + return label; + } + + public void setLabel(LabelSurrogate label) { + this.label = label; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EmailInformation that = (EmailInformation) o; + return Objects.equals(emailId, that.emailId) && +// Objects.equals(timestamp, that.timestamp) && --- End diff -- remove. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828967 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- The dir to checkpoint can be a parameter and here it should be a path in the `TEST_DIR` of the test itself. In addition, everything should be explicitly cleaned up, e.g. checkpoints, potential output/input data, etc. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178828372 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -37,6 +37,14 @@ echo "Flink distribution directory: $FLINK_DIR" EXIT_CODE=0 --- End diff -- I would recommend to move it to the nightly tests. Queryable state is not a core component and the normal builds are already timing out. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827995 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ +public class LabelSurrogate { + + private Type type; + private String foo; + + public LabelSurrogate(Type type, String foo) { + this.type = type; + this.foo = foo; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @Override + public String toString() { + return "LabelSurrogate{" + + "type=" + type + + ", foo='" + foo + '\'' + + '}'; + } + + /** +* Javadoc. +*/ --- End diff -- Same here. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827936 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +/** + * Javadoc. + */ --- End diff -- This is just a placeholder comment for checkstyle verification to pass. Please write a real comment. This holds also for other places. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178829164 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsBugPoc.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +/** + * Javadoc. + */ +public class QsBugPoc { + + public static final String QUERYABLE_STATE_NAME = "state"; + public static final String STATE_NAME = "state"; + + public static void main(final String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + RocksDBStateBackend rocksDb = new RocksDBStateBackend("file:///tmp/deleteme-rocksdb"); + env.setStateBackend(rocksDb); --- End diff -- Also check for different backends, i.e. file and rocks. You can have a look to the `test_ha.sh`. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827415 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { --- End diff -- Remove commented methods. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827698 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); --- End diff -- remove. ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827543 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; --- End diff -- same here (remove commented field). ---
[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5807#discussion_r178827678 --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.queryablestate; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Javadoc. + */ +public class EmailInformation implements Serializable { + + private static final long serialVersionUID = -8956979869800484909L; + + public void setEmailId(EmailId emailId) { + this.emailId = emailId; + } + + private EmailId emailId; + +// public void setTimestamp(Instant timestamp) { +// this.timestamp = timestamp; +// } + + //private Instant timestamp; + + public void setStuff(List stuff) { + this.stuff = stuff; + } + + private List stuff; + + public void setAsdf(Long asdf) { + this.asdf = asdf; + } + + private Long asdf = 0L; + + private transient LabelSurrogate label; + + public EmailInformation() { + + } + + public EmailInformation(Email email) { + emailId = email.getEmailId(); + // timestamp = email.getTimestamp(); + stuff = new ArrayList<>(); + stuff.add("1"); + stuff.add("2"); + stuff.add("3"); + label = email.getLabel(); + } + + public EmailId getEmailId() { + return emailId; + } + +// //public Instant getTimestamp() { --- End diff -- remove. ---
[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5750 Thanks a lot @twalthr for the review! ---
[GitHub] flink pull request #5756: [FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5756#discussion_r178030866 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -93,6 +93,10 @@ private MiniClusterResource( this.enableClusterClient = enableClusterClient; } + public MiniClusterType getMiniClusterType() { --- End diff -- This is definitely not a big issue. I will merge. ---
[GitHub] flink pull request #5756: [FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5756#discussion_r15838 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java --- @@ -93,6 +93,10 @@ private MiniClusterResource( this.enableClusterClient = enableClusterClient; } + public MiniClusterType getMiniClusterType() { --- End diff -- Why not renaming it to sth like `isLegacyDeployment()` and return a `boolean` this will also allow to make the `MiniClusterType` a private class which is also better pattern. ---
[GitHub] flink issue #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMachineEx...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5750 Hi @walterddr, I addressed most of your comments. Feel free to have another look. ---
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5750#discussion_r177436350 --- Diff: flink-end-to-end-tests/test-scripts/common.sh --- @@ -59,9 +162,42 @@ function start_cluster { done } +function jm_watchdog() { +expectedJms=$1 +ipPort=$2 + +while true; do +runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | wc -l`; +missingJms=$((expectedJms-runningJms)) +for (( c=0; c
[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5750#discussion_r177364376 --- Diff: flink-end-to-end-tests/test-scripts/common.sh --- @@ -39,6 +39,109 @@ cd $TEST_ROOT export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N) echo "TEST_DATA_DIR: $TEST_DATA_DIR" +function revert_default_config() { + +# first revert the conf/masters file +echo "localhost:8081" > ${FLINK_DIR}/conf/masters + +# and then the conf/flink-conf.yaml +sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL + #== +# Common + #== + +jobmanager.rpc.address: localhost +jobmanager.rpc.port: 6123 +jobmanager.heap.mb: 1024 +taskmanager.heap.mb: 1024 +taskmanager.numberOfTaskSlots: 1 +parallelism.default: 1 + + #== +# Web Frontend + #== + +web.port: 8081 +EOL +} + +function create_ha_config() { + +# create the masters file (only one currently). +# This must have all the masters to be used in HA. +echo "localhost:8081" > ${FLINK_DIR}/conf/masters + +# clean up the dir that will be used for zookeeper storage +# (see high-availability.zookeeper.storageDir below) +if [ -e $TEST_DATA_DIR/recovery ]; then + echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..." + rm -rf $TEST_DATA_DIR/recovery +fi + +# then move on to create the flink-conf.yaml +sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL + #== +# Common + #== + +jobmanager.rpc.address: localhost +jobmanager.rpc.port: 6123 +jobmanager.heap.mb: 1024 +taskmanager.heap.mb: 1024 +taskmanager.numberOfTaskSlots: 4 +parallelism.default: 1 + + #== +# High Availability + #== + +high-availability: zookeeper +high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/ +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /test_cluster_one + + #== +# Web Frontend + #== + +web.port: 8081 +EOL +} + +function start_ha_cluster { +echo "Setting up HA Cluster..." +create_ha_config +start_local_zk +start_cluster +} + +function start_local_zk { +# Parses the zoo.cfg and starts locally zk. + +# This is almost the same code as the +# /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost. --- End diff -- The problem is that the `zookeeper` script does not take into account that the fact that we may want to launch `zookeeper` locally, so it always asks for `SSH` credentials. ---
[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5751 Hi @sihuazhou ! I think that for now materializing the list of keys and then passing it to the `process` is the best solution. But keep in mind that this is only for the `HeapKeyedStateBackend` and not for RocksDB so the method should create the copy only there. ---