Back
Featured image of post Flink on native Kubernetes部署方案

Flink on native Kubernetes部署方案

集群需求及依赖

K8S集群需求

根据官方文档需求部署相关集群,并创建对应权限的SA

  • Kubernetes >= 1.9.
  • KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config.
  • Enabled Kubernetes DNS.
  • default service account with RBAC permissions to create, delete pods.
  • Service account with permissions to create, edit, delete ConfigMaps.

Flink集群依赖

跟Spark类似,Flink依赖一些库/连接器,比如高可用的checkpoints需要hdfs进行存储,batch job写数据需要mysql连接器等. 相关连接器和jar包需要集成在镜像$FLINK_HOME/lib目录下

K8S集群配置

  1. 创建SA并挂载镜像拉取secrets

    kubectl -n flink create sa flink-sa 
    kubectl -n flink patch serviceaccount flink-sa  -p '{"imagePullSecrets": [{"name": "yourregistrykey"}]}'
    
  2. 权限配置

    kubef create rolebinding flink-role-binding --clusterrole=edit --serviceaccount=flink:flink-sa --namespace=flink
    
  3. 更新DNS

    默认挂载的configmap在kube-system:coredns下,需要更新为192.168.0.11自建DNS服务器以解析内部hostname

    apiVersion: v1
    data:
      Corefile: |2-
            .:53 {
                ...
                prometheus :9153
                forward . 192.168.0.11
                ...
            }
    kind: ConfigMap
    
  4. 构建基础镜像

    将上述的依赖和jar包统统打包在镜像里

    FROM flink:1.15.2-java11
    
    # 相关依赖jar包和连接器
    ARG hadoop_jar=flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
    ARG jdbc_jar=flink-connector-jdbc-1.15.2.jar
    ARG mysql_jar=mysql-connector-java-8.0.30.jar
    ARG common_cli=commons-cli-1.5.0.jar
    ARG hadoop_conf=core-site.xml
    ARG hdfs_conf=hdfs-site.xml
    ENV HADOOP_HOME=/hadoop
    RUN mkdir -p $HADOOP_HOME
    COPY --chown=flink:flink $hadoop_conf $HADOOP_HOME
    COPY --chown=flink:flink $hdfs_conf $HADOOP_HOME
    COPY --chown=flink:flink $hadoop_jar $FLINK_HOME/lib/
    COPY --chown=flink:flink $common_cli $FLINK_HOME/lib/
    COPY --chown=flink:flink $mysql_jar $FLINK_HOME/lib/
    COPY --chown=flink:flink $jdbc_jar $FLINK_HOME/lib/
    # 同步job的jar包
    RUN mkdir -p $FLINK_HOME/usrlib
    COPY target/Streaming-Job.jar  $FLINK_HOME/usrlib/Streaming-Job.jar
    COPY target/Batch-Job.jar  $FLINK_HOME/usrlib/Batch-Job.jar
    
    USER flink
    

任务管理

任务提交

参数要点

  • 资源分配
  • prometheus监控
  • hdfs
  • 高可用HA
export HADOOP_CONF_DIR=/hadoop

./bin/flink run-application \
    --detached \
    --parallelism ${parallelism} \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=$task_type \
    -Dkubernetes.container.image=flink-java11:${release_num} \
    -Dkubernetes.jobmanager.cpu=$jm_cpu \
    -Dkubernetes.taskmanager.cpu=$tm_cpu \
    -Dtaskmanager.numberOfTaskSlots=$tm_slot \
    -Dtaskmanager.memory.process.size=${tm_mem}mb \
    -Dtaskmanager.memory.managed.fraction=0.05 \
    -Dtaskmanager.memory.network.fraction=0.05 \
    -Dtaskmanager.memory.jvm-overhead.fraction=0.05 \
    -Djobmanager.memory.process.size=700mb \
    -Djobmanager.memory.enable-jvm-direct-memory-limit \
    -Dfs.default-scheme=hdfs://datacluster \
    -Dmetrics.reporter.promgateway.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter \
    -Dmetrics.reporter.promgateway.hostUrl=http://192.168.0.1:9091 \
    -Dmetrics.reporter.promgateway.jobName=flink \
    -Dmetrics.reporter.promgateway.deleteOnShutdown=true \
    -Dmetrics.reporter.promgateway.randomJobNameSuffix=true \
    -Dmetrics.reporter.promgateway.interval=60  \
    -Dkubernetes.jobmanager.node-selector=task_type:streaming \
    -Dkubernetes.taskmanager.node-selector=task_type:streaming \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dkubernetes.rest-service.exposed.node-port-address-type=ExternalIP \
    -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
    -Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
    -Drestart-strategy=fixed-delay \
    -Drestart-strategy.fixed-delay.attempts=10 \
    -Dkubernetes.namespace=flink \
    -Dkubernetes.service-account=flink-sa \
    -Denv.hadoop.conf.dir=/hadoop \
    -Dkubernetes.container.image.pull-policy=Always \
    -Dflink.hadoop.dfs.client.failover.proxy.provider.datacluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider \
    local:///opt/flink/usrlib/${tm_jar}

为了方便提交,已经集成在脚本submit_streaming_job.sh里

任务取消

HA方案下job_id固定为00000000000000000000000000000000

./bin/flink cancel --target kubernetes-application \
-Dkubernetes.namespace=flink \
-Dkubernetes.service-account=flink-sa \
-Dkubernetes.cluster-id=$cluster_id $job_id

任务列表

./bin/flink list --target kubernetes-application \
-Dkubernetes.namespace=flink \
-Dkubernetes.service-account=flink-sa \
-Dkubernetes.cluster-id=$cluster_id

常见问题

  • 通过更新镜像的方式进行代码更新是不可行的,jobmanager实际是独立的pod,相关配置在configmap中,因此更新deployment的镜像文件并不会更新jobmanager的pod.详见讨论.正确的做法是先cacel job,再重新submit

    bash cacel_streaming_job.sh task1 && bash submit_streaming_job.sh task1 $RELEASE_NUM
    
  • 外部prometheus监控需要配置自动发现pod,目前通过apiserver proxy的方式测试没成功,所以采用pushgateway的方式,缺点是必须手动清理过期数据.当前版本测试下来关于backpressured的相关指标并不准确

     -Dmetrics.reporter.promgateway.hostUrl=http://192.168.0.1:9091 \
     -Dmetrics.reporter.promgateway.jobName=flink \
     -Dmetrics.reporter.promgateway.deleteOnShutdown=true \
     -Dmetrics.reporter.promgateway.randomJobNameSuffix=true \
     -Dmetrics.reporter.promgateway.interval=60  \
    
  • 内存配置,据官方文档阐述,只需要配置jobmanager.memory.process.sizetaskmanager.memory.process.size即可,其他的内存会自动计算,但实际应用下来,一些默认系数会偏大,导致出现跟最大值最小值不匹配的报错.因此,自主定义更为可靠. 为了更加方便计算,我整理了一个表格公式

    • 一些带有默认值的
      • JVM Metaspace = 256
      • JVM Heap/Framework Heap = 128
      • Direct Memory/Framework Off-Heap=128
    • 一些带有默认系数的
      • JVM Overhead= Max(Total Process Memory * fraction JVM Overhead, 192) min-max: (192,1000)
      • Off-Heap Memory/Managed Memory = (Total Process Memory - Mem JVM metaspace - Mem JVM Overhead) * fraction Managed Memory
      • Direct Memory/Network = ((Total Process Memory - Mem JVM metaspace - Mem JVM Overhead) * fraction network)
    • 自动计算的
      • JVM Heap/Task Heap = Total Process Memory - JVM Metaspace Memory - JVM Overhead Memory - Framework Heap − Framwork Off-Heap - Task Off-Heap - Managed Memory - Network

参考文档

  1. https://www.alibabacloud.com/blog/packaging-issues-in-datastream-development_598837
  2. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
Built with Hugo
Theme Stack designed by Jimmy