public static void main(String[] args) throws Exception {
		Configuration cfg = new Configuration();
		cfg.setString("fs.azure.account.key.<azureStorageAccountName>.blob.core.windows.net",
				"<Storage account key>");
		FileSystem.initialize(cfg, null);
//		GlobalConfiguration.loadConfiguration();
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		var kafkaConsumer = createKafkaConsumer("<topic>", "localhost:9092", "<kafkaGroup>");
		DataStream<InputUserEvent> dataStream = env.addSource(kafkaConsumer);
//		Schema.create(Typ)
		//AzureFSFactory a = new AzureFSFactory();
		StreamingFileSink<InputUserEvent> fileSink = StreamingFileSink
				.forBulkFormat(new Path("wasb://<azureStorageAccountName>@<>.blob.core.windows.net"), ParquetAvroWriters.forReflectRecord(InputUserEvent.class)).withRollingPolicy(new CustomRollingPolicy())
				.build();
//		FileSink<InputUserEvent> fs = FileSink.forBulkFormat(new Path("wasb://<azureStorageAccountName>@<>.blob.core.windows.net"), ParquetAvroWriters.forReflectRecord(InputUserEvent.class)).build();
		dataStream.map(item -> {
			System.out.println(item);
			return item;
		}).filter(item -> item.getTimestamp() != null);
		try{
			dataStream.addSink(fileSink);
		} catch (Exception e) {
			e.printStackTrace();
		}

		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	}
