Lasy's Documentation Logo
  • Applications
  • System Services
  • Operating Systems
  • Big Data
    • flink
      • Prerequisites
      • Deploy
        • Configure
        • Start a standalone cluster
      • Usage
        • Batch expample
        • Streaming example
      • Run on yarn
    • hadoop
    • hbase
    • kafka
    • Redis
    • spark
    • zookeeper
  • Cluster/Cloud Management
  • Software Development
  • AI
  • Technical Notes
  • Hardware
  • Mathematics
  • Physics
  • 来思园微信公众号
Lasy's Documentation
  • Big Data
  • flink
  • View page source

flink

https://flink.apache.org/

Prerequisites

Install JDK on each node, see “Install Java Development Environment”.

Get shell script install_java_bin.

Download the java binary packages:

$ curl -LO https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz

Check sum:

$ sha512sum flink-1.20.0-bin-scala_2.12.tgz
2af8c4d0329df8b139d8ad50ef9179bfed98907a9df7d4411b6e60ff60ca8105f81d07d8d2b7b904e214e68f10c9dfa3616274ca692a2b18de66f3541597a71d  flink-1.20.0-bin-scala_2.12.tgz

Deploy

Install the java binary package on each node:

$ install_java_bin flink flink-1.20.0-bin-scala_2.12.tgz /opt
$ sudo chown ubuntu:ubuntu /opt/flink/log

Configure

Edit file /opt/flink/conf/config.yaml:

   #
   # To enable this, set the bind-host address to one that has access to an outside facing network
   # interface, such as 0.0.0.0.
-  bind-host: localhost
+  bind-host: 0.0.0.0
   rpc:
     # The external address of the host on which the JobManager runs and can be
     # reached by the TaskManagers and any clients which want to connect. This setting
     # the conf/masters file, this will be taken care of automatically. Yarn
     # automatically configure the host name based on the hostname of the node where the
     # JobManager runs.
-    address: localhost
+    address: las0
     # The RPC port where the JobManager is reachable.
     port: 6123
   memory:
   #
   # To enable this, set the bind-host address to one that has access to an outside facing network
   # interface, such as 0.0.0.0.
-  bind-host: localhost
+  bind-host: 0.0.0.0
   # The address of the host on which the TaskManager runs and can be reached by the JobManager and
   # other TaskManagers. If not specified, the TaskManager will try different strategies to identify
   # the address.
   #
   # Note also that unless all TaskManagers are running on the same machine, this address needs to be
   # configured separately for each TaskManager.
-  host: localhost
+  host: las0
   # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
-  numberOfTaskSlots: 1
+  numberOfTaskSlots: 8
   memory:
     process:
       # The total process memory size for the TaskManager.
 
 rest:
   # The address to which the REST client will connect to
-  address: localhost
+  address: las0
   # The address that the REST & web server binds to
   # By default, this is localhost, which prevents the REST & web server from
   # being able to communicate outside of the machine/container it is running on.
   #
   # To enable this, set the bind address to one that has access to outside-facing
   # network interface, such as 0.0.0.0.
-  bind-address: localhost
+  bind-address: 0.0.0.0
   # # The port to which the REST client connects to. If rest.bind-port has
   # # not been specified, then the server will bind to this port as well.
   # port: 8081
 #     # thread. You can include the same directory multiple times in order to create
 #     # multiple I/O threads against that directory. This is for example relevant for
 #     # high-throughput RAIDs.
-#     dirs: /tmp
+#     dirs: /opt/tmp/flink
 
 # classloader:
 #   resolve:

Edit file /opt/flink/conf/masters:

-localhost:8081
+las0:8081

Edit file /opt/flink/conf/workers:

-localhost
+las0
+las1
+las2

These files need to be copied to all nodes to the same path.

Important

Edit file /opt/flink/conf/config.yaml to set the config taskmanager.host to the hostname of each node respectively.

Create the directory of ${io.tmp.dirs} on each node:

$ sudo mkdir -p /opt/tmp/flink
$ sudo chown ubuntu:ubuntu /opt/tmp/flink

Start a standalone cluster

Check the version:

$ flink --version
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.20.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.4.1/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Version: 1.20.0, Commit ID: b1fe7b4

Run the following command on the node of JobManager to start the cluster:

$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host las0.
Starting taskexecutor daemon on host las0.
Starting taskexecutor daemon on host las1.
Starting taskexecutor daemon on host las2.

Show java processes:

$ jps -lm
3365748 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/flink-1.20.0/conf -D taskmanager.memory.network.min=134217730b -D taskmanager.cpu.cores=8.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.numberOfTaskSlots=8 -D taskmanager.memory.jvm-overhead.max=201326592b
3366233 sun.tools.jps.Jps -lm
3364936 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b --configDir /opt/flink-1.20.0/conf --executionMode cluster

Open http://las0:8081/ in a web browser to see the Flink dashboard.

Stop the cluster:

$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 3365748) on host las0.
Stopping taskexecutor daemon (pid: 2304551) on host las1.
Stopping taskexecutor daemon (pid: 2204138) on host las2.
Stopping standalonesession daemon (pid: 3364936) on host las0.

Usage

Batch expample

Create a text file words.txt:

Alice
Betty
Alice
Cindy
Doris
Betty
Emily
Alice
Emily
Doris

Put the file into hdfs:

$ hdfs dfs -put -f words.txt

Submit a batch WordCount job:

$ flink run /opt/flink/examples/batch/WordCount.jar --input hdfs://las0:9000/user/ubuntu/words.txt
⋮
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 0850ca7989a26058d20f4dc987d546ae
Program execution finished
Job with JobID 0850ca7989a26058d20f4dc987d546ae has finished.
Job Runtime: 3010 ms
Accumulator Results: 
- c79afe69fc305f3b31d5a614cd556b18 (java.util.ArrayList) [5 elements]


(alice,3)
(betty,2)
(cindy,1)
(doris,2)
(emily,2)

Streaming example

Use the same words.txt file, submit a streaming WordCount job:

$ flink run /opt/flink/examples/streaming/WordCount.jar --input hdfs://las0:9000/user/ubuntu/words.txt
⋮
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID e7d0225697a0af2cb47db7b5866a43c4
Program execution finished
Job with JobID e7d0225697a0af2cb47db7b5866a43c4 has finished.
Job Runtime: 518 ms

The output will append to a .out file where the task is running. You can find the host on the Flink dashboard web UI. To monitor it:

$ tail -f flink-ubuntu-taskexecutor-0-las0.out
(alice,1)
(betty,1)
(alice,2)
(cindy,1)
(doris,1)
(betty,2)
(emily,1)
(alice,3)
(emily,2)
(doris,2)

Run on yarn

Environment HADOOP_CLASSPATH needs to be set properly in order to run Flink on yarn.

To start a Flink yarn session:

$ yarn-session.sh --detached
⋮
JobManager Web Interface: http://las2:36373
⋮

The last lines of the output expose the Flink dashboard URL and the commands to stop the cluster.

Note

TaskManager will be allocated dynamically based on the running jobs.

Now jobs can be submitted to the cluster.

Previous Next

© Copyright 2026, Lasy.

Built with Sphinx using a theme provided by Read the Docs.