hi all,

找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:

def collect[T](
    tEnv: TableEnvironment,
    table: Table,
    sink: CollectTableSink[T],
    jobName: Option[String]): Seq[T] = {
  val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
    .asInstanceOf[TypeInformation[T]]
    .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
      .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
  val id = new AbstractID().toString
  sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
  val sinkName = UUID.randomUUID().toString
  tEnv.registerTableSink(sinkName, sink)
  tEnv.insertInto(table, sinkName)

  val res = tEnv.execute("test")
  val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
  SerializedListAccumulator.deserializeList(accResult, typeSerializer)
}


jun su <sujun891...@gmail.com> 于2020年4月24日周五 下午2:05写道:

> hi all,
>
> blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> 结果用于代码调试么?
>
> --
> Best,
> Jun Su
>


-- 
Best,
Jun Su

回复