# flink ## Prerequisites Install JDK on each node, see "". Get shell script [`install_java_bin`](https://github.com/lasyard/coding/blob/main/shell/install_java_bin.sh). Download the java binary packages: ```console $ curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz ``` Check sum: ```console $ sha512sum flink-1.20.0-bin-scala_2.12.tgz 2af8c4d0329df8b139d8ad50ef9179bfed98907a9df7d4411b6e60ff60ca8105f81d07d8d2b7b904e214e68f10c9dfa3616274ca692a2b18de66f3541597a71d flink-1.20.0-bin-scala_2.12.tgz ``` ## Deploy Install the java binary package on each node: ```console $ install_java_bin flink flink-1.20.0-bin-scala_2.12.tgz /opt $ sudo chown ubuntu:ubuntu /opt/flink/log ``` ### Configure Edit file `/opt/flink/conf/config.yaml`: :::{literalinclude} /_files/ubuntu/opt/flink/conf/config.yaml :diff: /_files/ubuntu/opt/flink/conf/config.yaml.orig ::: Edit file `/opt/flink/conf/masters`: :::{literalinclude} /_files/ubuntu/opt/flink/conf/masters :diff: /_files/ubuntu/opt/flink/conf/masters.orig ::: Edit file `/opt/flink/conf/workers`: :::{literalinclude} /_files/ubuntu/opt/flink/conf/workers :diff: /_files/ubuntu/opt/flink/conf/workers.orig ::: These files need to be copied to all nodes to the same path. :::{important} Edit file `/opt/flink/conf/config.yaml` to set the config `taskmanager.host` to the hostname of each node respectively. ::: Create the directory of `${io.tmp.dirs}` on each node: ```console $ sudo mkdir -p /opt/tmp/flink $ sudo chown ubuntu:ubuntu /opt/tmp/flink ``` ### Start a standalone cluster Check the version: ```console $ flink --version SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/flink-1.20.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-3.4.1/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Version: 1.20.0, Commit ID: b1fe7b4 ``` Run the following command on the node of JobManager to start the cluster: ```console $ start-cluster.sh Starting cluster. Starting standalonesession daemon on host las0. Starting taskexecutor daemon on host las0. Starting taskexecutor daemon on host las1. Starting taskexecutor daemon on host las2. ``` Show java processes: ```console $ jps -lm 3365748 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink-1.20.0/conf -D taskmanager.memory.network.min=134217730b -D taskmanager.cpu.cores=8.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.numberOfTaskSlots=8 -D taskmanager.memory.jvm-overhead.max=201326592b 3366233 sun.tools.jps.Jps -lm 3364936 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b --configDir /opt/flink-1.20.0/conf --executionMode cluster ``` Open `http://las0:8081/` in a web browser to see the Flink dashboard. Stop the cluster: ```console $ stop-cluster.sh Stopping taskexecutor daemon (pid: 3365748) on host las0. Stopping taskexecutor daemon (pid: 2304551) on host las1. Stopping taskexecutor daemon (pid: 2204138) on host las2. Stopping standalonesession daemon (pid: 3364936) on host las0. ``` ## Usage ### Batch expample Create a text file `words.txt`: :::{literalinclude} /_files/ubuntu/workspace/words.txt ::: Put the file into hdfs: ```console $ hdfs dfs -put -f words.txt ``` Submit a batch `WordCount` job: ```console $ flink run /opt/flink/examples/batch/WordCount.jar --input hdfs://las0:9000/user/ubuntu/words.txt ... Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 0850ca7989a26058d20f4dc987d546ae Program execution finished Job with JobID 0850ca7989a26058d20f4dc987d546ae has finished. Job Runtime: 3010 ms Accumulator Results: - c79afe69fc305f3b31d5a614cd556b18 (java.util.ArrayList) [5 elements] (alice,3) (betty,2) (cindy,1) (doris,2) (emily,2) ``` ### Streaming example Use the same `words.txt` file, submit a streaming `WordCount` job: ```console $ flink run /opt/flink/examples/streaming/WordCount.jar --input hdfs://las0:9000/user/ubuntu/words.txt ... Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID e7d0225697a0af2cb47db7b5866a43c4 Program execution finished Job with JobID e7d0225697a0af2cb47db7b5866a43c4 has finished. Job Runtime: 518 ms ``` The output will append to a `.out` file where the task is running. You can find the host on the Flink dashboard web UI. To monitor it: ```console $ tail -f flink-ubuntu-taskexecutor-0-las0.out (alice,1) (betty,1) (alice,2) (cindy,1) (doris,1) (betty,2) (emily,1) (alice,3) (emily,2) (doris,2) ``` ## Run on yarn Environment `HADOOP_CLASSPATH` needs to be set properly in order to run Flink on yarn. To start a Flink yarn session: ```console $ yarn-session.sh --detached ... JobManager Web Interface: http://las2:36373 ... ``` The last lines of the output expose the Flink dashboard URL and the commands to stop the cluster. :::{note} TaskManager will be allocated dynamically based on the running jobs. ::: Now jobs can be submitted to the cluster.