robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r861994013


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   This doesn't return the operator.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/examples/wordcount2.ts:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: Should this be in a top-level examples dir, rather than under 
apache_beam.
+
+import * as beam from "../../apache_beam";
+import * as textio from "../io/textio";
+import { DirectRunner } from "../runners/direct_runner";
+
+import { count } from "../transforms/combiners";
+import { GroupBy } from "../transforms/group_and_combine";
+
+import { PortableRunner } from "../runners/portable_runner/runner";
+
+class CountElements extends beam.PTransform<
+  beam.PCollection<any>,
+  beam.PCollection<any>
+> {
+  expand(input: beam.PCollection<any>) {
+    return input
+      .map((e) => ({ element: e }))
+      .apply(new GroupBy("element").combining("element", count, "count"));
+  }
+}
+
+function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
+  return lines
+    .map((s: string) => s.toLowerCase())
+    .flatMap(function* (line: string) {
+      yield* line.split(/[^a-z]+/);
+    })
+    .apply(new CountElements("Count"));
+}
+
+async function main() {
+  // python apache_beam/runners/portability/local_job_service_main.py --port 
3333
+  await new PortableRunner("localhost:3333").run(async (root) => {
+    const lines = await root.asyncApply(
+      new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")

Review Comment:
   Done.



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   This is a two-level mapping; the first `if` makes sure the first level 
exists and the second checks the second level. 



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(

Review Comment:
   As I read the documentation, spanSync waits until the process has completed, 
which is not what we want here. 



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Same.



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   The payloads are different as well. I'm going to leave this as is, as 
there's also value in making this quasi-object-literal very transparent. 



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) 
{
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a 
Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a 
single bundle may contain
+      // elements from different windows, so each element must specify its 
window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) 
{}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   That is correct. Added a comment to clarify.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";

Review Comment:
   Yep. Leftover from debugging. Removed.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Good call on both fronts. Done.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";
+    const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+    const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+    const projectRoot = path.resolve(
+      __dirname,
+      "..",
+      "..",
+      "..",
+      "..",
+      "..",
+      ".."
+    );

Review Comment:
   I agree. For now I added a TODO.



##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "context"
+       "flag"
+       "log"
+       "os"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+       // Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+       id                = flag.String("id", "", "Local identifier 
(required).")
+       loggingEndpoint   = flag.String("logging_endpoint", "", "Local logging 
endpoint for FnHarness (required).")
+       artifactEndpoint  = flag.String("artifact_endpoint", "", "Local 
artifact endpoint for FnHarness (required).")
+       provisionEndpoint = flag.String("provision_endpoint", "", "Local 
provision endpoint for FnHarness (required).")
+       controlEndpoint   = flag.String("control_endpoint", "", "Local control 
endpoint for FnHarness (required).")
+       semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local 
semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+       flag.Parse()
+       if *id == "" {
+               log.Fatal("No id provided.")
+       }
+       if *provisionEndpoint == "" {
+               log.Fatal("No provision endpoint provided.")
+       }
+
+       ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+       info, err := provision.Info(ctx, *provisionEndpoint)
+       if err != nil {
+               log.Fatalf("Failed to obtain provisioning information: %v", err)
+       }
+       log.Printf("Provision info:\n%v", info)
+
+       // TODO(BEAM-8201): Simplify once flags are no longer used.

Review Comment:
   I'm not sure if all the cleanup has been done on the Dataflow side. @ihji 



##########
sdks/typescript/README.md:
##########
@@ -0,0 +1,208 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Typescript Beam SDK
+
+This is the start of a fully functioning Javascript (actually, Typescript) SDK.
+There are two distinct aims with this SDK
+
+1. Tap into the large (and relatively underserved, by existing data processing
+frameworks) community of javascript developers with a native SDK targeting 
this language.
+
+1. Develop a new SDK which can serve both as a proof of concept and reference
+that highlights the (relative) ease of porting Beam to new languages,
+a differentiating feature of Beam and Dataflow.
+
+To accomplish this, we lean heavily on the portability framework.
+For example, we make heavy use of cross-language transforms,
+in particular for IOs.
+In addition, the direct runner is simply an extension of the worker suitable
+for running on portable runners such as the ULR, which will directly transfer
+to running on production runners such as Dataflow and Flink.
+The target audience should hopefully not be put off by running other-language
+code encapsulated in docker images.
+
+## API
+
+We generally try to apply the concepts from the Beam API in a Typescript
+idiomatic way, but it should be noted that few of the initial developers
+have extensive (if any) Javascript/Typescript development experience, so
+feedback is greatly appreciated.
+
+In addition, some notable departures are taken from the traditional SDKs:
+
+* We take a "relational foundations" approach, where
+[schema'd 
data](https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf)
+is the primary way to interact with data, and we generally eschew the key-value
+requiring transforms in favor of a more flexible approach naming fields or
+expressions. Javascript's native Object is used as the row type.
+
+* As part of being schema-first we also de-emphasize Coders as a first-class
+concept in the SDK, relegating it to an advance feature used for interop.
+Though we can infer schemas from individual elements, it is still TBD to
+figure out if/how we can leverage the type system and/or function introspection
+to regularly infer schemas at construction time. A fallback coder using BSON
+encoding is used when we don't have sufficient type information.
+
+* We have added additional methods to the PCollection object, notably `map`
+and `flatmap`, [rather than only allowing 
apply](https://www.mail-archive.com/[email protected]/msg06035.html).
+In addition, `apply` can accept a function argument `(PColletion) => ...` as
+well as a PTransform subclass, which treats this callable as if it were a
+PTransform's expand.
+
+* In the other direction, we have eliminated the
+[problematic Pipeline object](https://s.apache.org/no-beam-pipeline)
+from the API, instead providing a `Root` PValue on which pipelines are built,
+and invoking run() on a Runner.  We offer a less error-prone `Runner.run`
+which finishes only when the pipeline is completely finished as well as
+`Runner.runAsync` which returns a handle to the running pipeline.
+
+* Rather than introduce PCollectionTuple, PCollectionList, etc. we let PValue
+literally be an
+[array or object with PValue 
values](https://github.com/robertwb/beam-javascript/blob/de4390dd767f046903ac23fead5db333290462db/sdks/node-ts/src/apache_beam/pvalue.ts#L116)
+which transforms can consume or produce.
+These are applied by wrapping them with the `P` operator, e.g.
+`P([pc1, pc2, pc3]).apply(new Flatten())`.
+
+* Like Python, `flatMap` and `ParDo.process` return multiple elements by
+yielding them from a generator, rather than invoking a passed-in callback.
+TBD how to output to multiple distinct PCollections.
+There is currently an operation to split a PCollection into multiple
+PCollections based on the properties of the elements, and
+we may consider using a callback for side outputs.
+
+* The `map`, `flatmap`, and `ParDo.proceess` methods take an additional
+(optional) context argument, which is similar to the keyword arguments
+used in Python. These can be "ordinary" javascript objects (which are passed
+as is) or special DoFnParam objects which provide getters to element-specific
+information (such as the current timestamp, window, or side input) at runtime.
+
+* Javascript supports (and encourages) an asynchronous programing model, with
+many libraries requiring use of the async/await paradigm.
+As there is no way (by design) to go from the asyncronous style back to
+the synchronous style, this needs to be taken into account
+when designing the API.
+We currently offer asynchronous variants of `PValue.apply(...)` (in addition
+to the synchronous ones, as they are easier to chain) as well as making
+`Runner.run` asynchronous. TBD to do this for all user callbacks as well.
+
+An example pipeline can be found at 
https://github.com/robertwb/beam-javascript/blob/javascript/sdks/node-ts/src/apache_beam/examples/wordcount.ts
+
+## TODO
+
+This SDK is a work in progress. In January 2022 we developed the ability to
+construct and run basic pipelines (including external transforms and running
+on a portable runner) but the following big-ticket items remain.
+
+* Containerization
+
+  * Function and object serialization: we currently only support "loopback"
+  mode; to be able to run on a remote, distributed manner we need to finish up
+  the work in picking closures and DoFn objects. Some investigation has been
+  started here, but all existing libraries have non-trivial drawbacks.
+
+  * Finish the work in building a full SDK container image that starts
+  the worker.
+
+  * Actually use worker threads for multiple bundles.
+
+* API
+
+  * There are several TODOs of minor features or design decisions to finalize.
+
+    * Consider using (or supporting) 2-arrays rather than {key, value} objects
+      for KVs.
+
+    * Consider renaming map/flatMap to doMap/doFlatMap to avoid confusion with
+    Array.map that takes a key as a second callback argument.
+    Or force the second argument to be an Object, which would lead to a less
+    confusing API and clean up the implementation.
+    Also add a [do]Filter, and possibly a [do]Reduce?
+
+    * Move away from using classes.
+
+  * Add the ability to set good PTransform names, and ideally infer good
+  defaults.
+
+  * Advanced features like metrics, state, timers, and SDF.
+  Possibly some of these can wait.
+
+* Infrastructure
+
+  * Gradle and Jenkins integration for tests and style enforcement.
+
+* Other
+
+  * Enforce unique names for pipeline update.
+
+  * PipelineOptions should be a Javascript Object, not a proto Struct.
+
+  * Though Dataflow Runner v2 supports portability, submission is still done
+  via v1beta3 and interaction with GCS rather than the job submission API.
+
+  * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+
+  * Avoid `any` return types (and re-enable check in compiler).
+
+  * Relative vs. absoute imports, possibly via setting a base url with a
+  `jsconfig.json`.  Also remove imports from base.ts.
+
+  * More/better tests, including tests of illegal/unsupported use.
+
+  * Set channel options like `grpc.max_{send,receive}_message_length` as we
+  do in other SDKs.
+
+  * Reduce use of any.
+
+    * Could use `unknown` in its place where the type is truly unknown.
+
+    * It'd be nice to enforce, maybe re-enable `noImplicitAny: true` in
+    tsconfig if we can get the generated proto files to be ignored.
+
+  * Enable a linter like eslint and fix at least the low hanging fruit.
+
+There is probably more; there are many TODOs littered throughout the code.
+
+This code has also not yet been fully peer reviewed (it was the result of a
+hackathon) which needs to be done before putting it into the man repository.
+
+
+## Development.

Review Comment:
   Yes. Excellent.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();

Review Comment:
   I agree. Done.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to