12、Flink 基础 - Source之从文件读取

一、文件准备

sensor.txt

sensor_1 1547718199 35.8
sensor_6 1547718201, 15.4
sensor_7 1547718202, 6.7
sensor_10 1547718205 38.1

将文件上传到hdfs

hadoop fs -copyFromLocal sensor.txt /user/hive/warehouse

二、程序准备

SourceTest2_File

package org.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @remark  Flink Source之从文件读取
 */
public class SourceTest2_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从文件读取
        DataStream<String> dataStream = env.readTextFile("hdfs://hp1:8020/user/hive/warehouse/sensor.txt");
        // 打印输出
        dataStream.print();
        env.execute();
    }
}

三、运行Flink程序

运行命令:

flink run -m yarn-cluster -c org.example.SourceTest2_File FlinkStudy-1.0-SNAPSHOT.jar

运行截图: