I'm implementing a data analysis pipeline in Flink and I have a problem 
converting a DataStream to a Table. I have this table defined from a join 
between two Kafka sources:

    Table legalFileEventsTable = legalFilesTable.join(eventsTable)
            .where($("id").isEqual($("id_fascicolo")))
            .select(
                    $("id").as("id_fascicolo"),
                    $("id_evento"),
                    $("giudice"),
                    $("nrg"),
                    $("codice_oggetto"),
                    $("ufficio"),
                    $("sezione"),
                    $("data_evento"),
                    $("evento"),
                    $("data_registrazione_evento")
            );
Then I convert the joined table to a DataStream to apply some computation on 
the data. Here's the code I'm using:

    DataStream<Row> phasesDurationsDataStream = 
tEnv.toChangelogStream(legalFileEventsTable)
            .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
            .process(new PhaseDurationCounterProcessFunction());
    phasesDurationsDataStream.print();
The PhaseDurationCounterProcessFunction emits a Row like this:

Row outputRow = Row.withNames(RowKind.INSERT);
        outputRow.setField("id_fascicolo", currentState.getId_fascicolo());
        outputRow.setField("nrg", currentState.getNrg());
        outputRow.setField("giudice", currentState.getGiudice());
        outputRow.setField("codice_oggetto", currentState.getCodice_oggetto());
        outputRow.setField("ufficio", currentState.getUfficio());
        outputRow.setField("sezione", currentState.getSezione());
        outputRow.setField("fase", currentState.getPhase());
        outputRow.setField("fase_completata", false);
        outputRow.setField("durata", currentState.getDurationCounter());
        out.collect(outputRow);
After collecting the results from the process function I reconvert the 
DataStream to a Table and execute the pipeline:

    Table phasesDurationsTable = tEnv.fromChangelogStream(
            phasesDurationsDataStream,
            Schema.newBuilder()
                    .column("id_fascicolo", DataTypes.BIGINT())
                    .column("nrg", DataTypes.STRING())
                    .column("giudice", DataTypes.STRING())
                    .column("codice_oggetto", DataTypes.STRING())
                    .column("ufficio", DataTypes.STRING())
                    .column("sezione", DataTypes.STRING())
                    .column("fase", DataTypes.STRING())
                    .column("fase_completata", DataTypes.BOOLEAN())
                    .column("durata", DataTypes.BIGINT())
                    .primaryKey("id_fascicolo", "fase")
                    .build(),
            ChangelogMode.upsert()
    );
    env.execute();
But during the startup I receive this exception:

Unable to find a field named 'id_fascicolo' in the physical data type derived 
from the given type information for schema declaration.
 Make sure that the type information is not a generic raw type. Currently 
available fields are: [f0]
It seems that the row information (name and type) aren't available yet and so 
the exception is generated. I tried to invoke the env.execute() before the 
DataStream->Table conversion and in this case the job starts but I have no 
output if I print the phasesDurationsTable. Any suggestions on how to make this 
work?

Eugenio

Reply via email to