http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullSideInputReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullSideInputReader.java deleted file mode 100644 index 3da2dc1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullSideInputReader.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.collect.Sets; -import java.util.Collections; -import java.util.Set; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * A {@link SideInputReader} representing a well-defined set of views, but not storing - * any values for them. Used to check if a side input is present when the data itself - * comes from elsewhere. - */ -public class NullSideInputReader implements SideInputReader { - - private Set<PCollectionView<?>> views; - - public static NullSideInputReader empty() { - return new NullSideInputReader(Collections.<PCollectionView<?>>emptySet()); - } - - public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) { - return new NullSideInputReader(views); - } - - private NullSideInputReader(Iterable<? extends PCollectionView<?>> views) { - this.views = Sets.newHashSet(views); - } - - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow window) { - throw new IllegalArgumentException("cannot call NullSideInputReader.get()"); - } - - @Override - public boolean isEmpty() { - return views.isEmpty(); - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - return views.contains(view); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java deleted file mode 100644 index cb38a55..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReadyCheckingSideInputReader.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has - * had its contents set in a window. - */ -public interface ReadyCheckingSideInputReader extends SideInputReader { - /** - * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. - */ - boolean isReady(PCollectionView<?> view, BoundedWindow window); -} - http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SideInputReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SideInputReader.java deleted file mode 100644 index e81c704..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SideInputReader.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * The interface to objects that provide side inputs. Particular implementations - * may read a side input directly or use appropriate sorts of caching, etc. - */ -public interface SideInputReader { - /** - * Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}. - * - * <p>It is valid for a side input to be {@code null}. It is <i>not</i> valid for this to - * return {@code null} for any other reason. - */ - @Nullable - <T> T get(PCollectionView<T> view, BoundedWindow window); - - /** - * Returns true if the given {@link PCollectionView} is valid for this reader. - */ - <T> boolean contains(PCollectionView<T> view); - - /** - * Returns true if there are no side inputs in this reader. - */ - boolean isEmpty(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 46bd6d4..fd9f0df 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -53,12 +53,12 @@ import org.apache.beam.runners.core.BoundedSourceRunner; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag;