K8s日志收集方案

k8s日志方案

目前业界3种K8s的日志收集方案,以下是结合多种方案考虑的K8s日志收集的方案

采用:日志输出到终端+Fluentd+Kafka+logstash+elasticsearch+kibana

elk

首先部署Fluentd

  • 让日志输出到终端然后我们在每一台节点上启动DaemonSet去采集日志
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  #------系统配置参数-----
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  #------Kubernetes 容器日志收集配置------
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      tag raw.kubernetes.*
      read_from_head true
      <parse>
        @type multi_format
        <pattern>
          format json
          time_key time
          time_format %Y-%m-%dT%H:%M:%S.%NZ
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
    <filter **>
      @id filter_concat
      @type concat
      key message
      multiline_end_regexp /\n$/
      separator ""
    </filter>
    <filter kubernetes.**>
      @id filter_kubernetes_metadata
      @type kubernetes_metadata
    </filter>
    <filter kubernetes.**>
      @id filter_parser
      @type parser
      key_name log
      reserve_data true
      remove_key_name_field true
      <parse>
        @type multi_format
        <pattern>
          format json
        </pattern>
        <pattern>
          format none
        </pattern>
      </parse>
    </filter>
  #------系统日志收集-------
  system.input.conf: |-  
    <source>
      @id journald-docker
      @type systemd
      matches [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/journald-docker.pos
      </storage>
      read_from_head true
      tag docker
    </source>
    <source>
      @id journald-container-runtime
      @type systemd
      matches [{ "_SYSTEMD_UNIT": "{{ fluentd_container_runtime_service }}.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/journald-container-runtime.pos
      </storage>
      read_from_head true
      tag container-runtime
    </source>
    <source>
      @id journald-kubelet
      @type systemd
      matches [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/journald-kubelet.pos
      </storage>
      read_from_head true
      tag kubelet
    </source>
    <source>
      @id journald-node-problem-detector
      @type systemd
      matches [{ "_SYSTEMD_UNIT": "node-problem-detector.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/journald-node-problem-detector.pos
      </storage>
      read_from_head true
      tag node-problem-detector
    </source>
    <source>
      @id kernel
      @type systemd
      matches [{ "_TRANSPORT": "kernel" }]
      <storage>
        @type local
        persistent true
        path /var/log/kernel.pos
      </storage>
      <entry>
        fields_strip_underscores true
        fields_lowercase true
      </entry>
      read_from_head true
      tag kernel
    </source>
#  forward.input.conf: |-
#    # Takes the messages sent over TCP
#    <source>
#       @type forward
#    </source>
  #------输出到kafka的配置------#
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      #@id kafka
      @type kafka2
      @log_level info
      #include_tag_key true
      # list of seed brokers
      brokers kafka-headless:9092
      use_event_time true
      # buffer settings
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
      # data type settings
      <format>
        @type json
      </format>
      # topic settings
      topic_key topic
      default_topic messages
      # producer settings
      required_acks -1
      compression_codec gzip
    </match>

#  #------输出到 ElasticSearch 配置------
#  output.conf: |-
#    <match **>
#      @id elasticsearch
#      @type elasticsearch
#      @log_level info
#      type_name _doc
#      include_tag_key true
#      host elasticsearch     #改成自己的 ElasticSearch 地址
#      port 9200
#      logstash_format true
#      logstash_prefix kubernetes
#      logst
#      request_timeout    30s
#      <buffer>
#        @type file
#        path /var/log/fluentd-buffers/kubernetes.system.buffer
#        flush_mode interval
#        retry_type exponential_backoff
#        flush_thread_count 5
#        flush_interval 8s
#        retry_forever
#        retry_max_interval 30
#        chunk_limit_size 5M
#        queue_limit_length 10
#        overflow_action block
#        compress gzip               #开启gzip提高日志采集性能
#      </buffer>
#    </match>
  • 配置fluentd的相关权限
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
      # 此注释确保如果节点被驱逐,fluentd不会被驱逐,支持关键的基于 pod 注释的优先级方案。
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
    spec:
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: harbor.996a.com/public_images/fluentd:v3.0.2
        #image: harbor.996a.com/public_images/fluentd-kafka:v2.0
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
      # 通过给节点打标签的方式来控制那些节点部署日志采集那些不采集
      #nodeSelector:
      #  beta.kubernetes.io/fluentd-ds-ready: "true"
      tolerations:
      - operator: Exists
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config

fluentd

  • 出现以下问题请检查你的配置是否正常

配置ElasticSearch信息

  • 因暂时没有分布式存储采用的是local的存储模式,后面使用共享存储可以替换
  • 节点做了绑定在使用的时候记得替换信息,还需要在对应的节点事先创建好存储的目录
apiVersion: v1
kind: PersistentVolume
metadata:
  name: es-pv-01
  namespace: logging
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 30Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  hostPath:
    path: "/data/k8s/elasticsearch/01"
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - k8s-master-vm1-46
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: es-pv-02
  namespace: logging
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 30Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  hostPath:
    path: "/data/k8s/elasticsearch/02"
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - k8s-master-vm1-46
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: es-pv-03
  namespace: logging
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 30Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  hostPath:
    path: "/data/k8s/elasticsearch/03"
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - k8s-master-vm1-46
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
  • 配置sts和service的文件
  • elasticsearch不设置ndoeport,设置DNS提供给集群内部使用
kind: Service
apiVersion: v1
metadata:
  name: elasticsearch
  namespace: logging
  labels:
    app: elasticsearch
spec:
  selector:
   app: elasticsearch
  clusterIP: None
  ports:
   - port: 9200
     name: rest
   - port: 9300
     name: inter
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: es
  namespace: logging
spec:
  serviceName: elasticsearch
  replicas: 3
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels: 
        app: elasticsearch
    spec:
      nodeSelector:
        es: log
      initContainers:
      - name: change-permissions
        image: busybox
        command: ["sh","-c","chown -R 1000:1000 /usr/share/elasticsearch/data"]
        securityContext:
          privileged: true
        volumeMounts:
        - name: data
          mountPath: /usr/share/elasticsearch/data
      - name: increase-vm-max-map
        image: busybox
        command: ["sysctl", "-w", "vm.max_map_count=262144"]
        securityContext:
          privileged: true
      - name: increase-fd-ulimit
        image: busybox
        command: ["sh", "-c", "ulimit -n 65536"]
        securityContext:
          privileged: true
      containers:
      - name: elasticsearch
        image: harbor.996a.com/public_images/elasticsearch:7.6.2
        securityContext:
          privileged: true
        ports:
        - name: rest
          containerPort: 9200
        - name: inter
          containerPort: 9300
        resources:
          limits:
            cpu: 1000m
          requests:
            cpu: 1000m
        volumeMounts:
        - name: data
          mountPath: /usr/share/elasticsearch/data
        env:
        - name: cluster.name
          value: k8s-logs
        - name: node.name
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: cluster.initial_master_nodes
          value: "es-0,es-1,es-2"
        - name: discovery.zen.minimum_master_nodes
          value: "2"
        - name: discovery.seed_hosts
          value: "elasticsearch"
        - name: ES_JAVA_OPTS
          value: "-Xms512m -Xmx512m"
        - name: network.host
          value: "0.0.0.0"
  volumeClaimTemplates:
  - metadata:
      name: data
      labels:
        app: elasticsearch
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: "local-storage"
      resources:
        requests:
          storage: 30Gi

elks

Helm安装Kafka

目前根据不同的需求可以在K8s内外都提供服务以下图给出了方案:
1.如果我们使用的向外提供服务那么久必须先把kafka和zookeeper的网络模式都改为nodeport或者loadbalancer
2.切开启kafka对外提供服务的安全模式

Imgur

kafka对K8s内部提供服务

  • 添加helm仓库: helm repo add incubator https://charts.helm.sh/incubator

  • 更新Helm仓库:helm update repo

  • 打印仓库列表:helm repo list

  • 下载kafka的配置文件:wget https://raw.githubusercontent.com/helm/charts/master/incubator/kafka/values.yaml

replicas: 3
image: "confluentinc/cp-kafka"
imagePullPolicy: "IfNotPresent"
resources: {}
kafkaHeapOptions: "-Xmx1G -Xms1G"
securityContext: {}
updateStrategy:
  type: "OnDelete"
podManagementPolicy: OrderedReady
logSubPath: "logs"
affinity: {}
nodeSelector: {}
readinessProbe:
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  successThreshold: 1
  failureThreshold: 3
terminationGracePeriodSeconds: 60
tolerations:
 - key: "node-role.kubernetes.io/master"
   operator: "Exists"
   effect: "NoSchedule"
headless:
  port: 9092
external:
  enabled: false
  type: NodePort
  dns:
    useInternal: false
    useExternal: true
  distinct: false
  servicePort: 19092
  firstListenerPort: 31090
  domain: cluster.local
  loadBalancerIP: []
  loadBalancerSourceRanges: []
  init:
    image: "lwolf/kubectl_deployer"
    imageTag: "0.4"
    imagePullPolicy: "IfNotPresent"
podAnnotations: {}
podLabels: {}
podDisruptionBudget: {}
configurationOverrides:
envOverrides: {}
additionalPorts: {}
persistence:
  enabled: true
  size: "1Gi"
  mountPath: "/opt/kafka/data"
  storageClass: local-storage
jmx:
  configMap:
    enabled: true
    overrideConfig: {}
    overrideName: ""
  port: 5555
  - kafka.controller:*
  - kafka.server:*
  - java.lang:*
  - kafka.network:*
  - kafka.log:*
prometheus:
  jmx:
    enabled: false
    image: solsson/kafka-prometheus-jmx-exporter@sha256
    imageTag: a23062396cd5af1acdf76512632c20ea6be76885dfc20cd9ff40fb23846557e8
    interval: 10s
    scrapeTimeout: 10s
    port: 5556
    resources: {}
  kafka:
    enabled: false
    image: danielqsj/kafka-exporter  
    imageTag: v1.2.0
    interval: 10s
    scrapeTimeout: 10s
    port: 9308
    resources: {}
    tolerations: []
    affinity: {}
    nodeSelector: {}
  operator:
    enabled: false
    serviceMonitor:
      namespace: monitoring
      releaseNamespace: false
      selector:
        prometheus: kube-prometheus
    prometheusRule:
      enabled: false
      namespace: monitoring
      releaseNamespace: false
      selector:
        prometheus: kube-prometheus
      rules:
      - alert: KafkaNoActiveControllers
        annotations:
          message: The number of active controllers in {{ "{{" }} $labels.namespace {{ "}}" }} is less than 1. This usually means that some of the Kafka nodes aren't communicating properly. If it doesn't resolve itself you can try killing the pods (one by one whilst monitoring the under-replicated partitions graph).
        expr: max(kafka_controller_kafkacontroller_activecontrollercount_value) by (namespace) < 1
        for: 5m
        labels:
          severity: critical
      - alert: KafkaMultipleActiveControllers
        annotations:
          message: The number of active controllers in {{ "{{" }} $labels.namespace {{ "}}" }} is greater than 1. This usually means that some of the Kafka nodes aren't communicating properly. If it doesn't resolve itself you can try killing the pods (one by one whilst monitoring the under-replicated partitions graph).
        expr: max(kafka_controller_kafkacontroller_activecontrollercount_value) by (namespace) > 1
        for: 5m
        labels:
          severity: critical
configJob:
  backoffLimit: 6
topics: []
testsEnabled: true
zookeeper:
  enabled: true
  resources: ~
  env:
    ZK_HEAP_SIZE: "1G"
  persistence:
    enabled: false
  image:
    PullPolicy: "IfNotPresent"
  url: ""
  port: 2181
  • 下载kafka的包并且解压:helm fetch incubator/kafka
# chart包文件结构
[root@k8s-master-vm1-45 kafka]# tree
.
├── charts
│   └── zookeeper
│       ├── Chart.yaml
│       ├── OWNERS
│       ├── README.md
│       ├── templates
│       │   ├── config-jmx-exporter.yaml
│       │   ├── config-script.yaml
│       │   ├── _helpers.tpl
│       │   ├── job-chroots.yaml
│       │   ├── NOTES.txt
│       │   ├── poddisruptionbudget.yaml
│       │   ├── service-headless.yaml
│       │   ├── servicemonitors.yaml
│       │   ├── service.yaml
│       │   └── statefulset.yaml
│       └── values.yaml
├── Chart.yaml
├── README.md
├── requirements.lock
├── requirements.yaml
├── templates
│   ├── configmap-config.yaml
│   ├── configmap-jmx.yaml
│   ├── deployment-kafka-exporter.yaml
│   ├── _helpers.tpl
│   ├── job-config.yaml
│   ├── NOTES.txt
│   ├── podisruptionbudget.yaml
│   ├── prometheusrules.yaml
│   ├── service-brokers-external.yaml
│   ├── service-brokers.yaml
│   ├── service-headless.yaml
│   ├── servicemonitors.yaml
│   ├── statefulset.yaml
│   └── tests
│       └── test_topic_create_consume_produce.yaml
└── values.yaml

5 directories, 33 files
  • 部署Kafka的Chart包:helm install kafka incubator/kafka -f values.yaml
# 查看当前的heml安装情况
[root@k8s-master-vm1-45 kafka]# helm  list -n logging
NAME    NAMESPACE       REVISION        UPDATED                                 STATUS          CHART           APP VERSION
kafka   logging         1               2021-03-10 15:24:23.155342161 +0800 CST deployed        kafka-0.21.5    5.0.1

kafka对外部提供服务

  • 修改kafka的value.yaml文件

  • 更多细节的东西根据自己的需求去变更配置文件

# 开启额外的网络模式
external:
  enabled: true
  # type can be either NodePort or LoadBalancer
  type: NodePort
# kafka配置
configurationOverrides:
  "confluent.support.metrics.enable": false  # Disables confluent metric submission
  # 这里会根据你定义的副本个数在端口号+1
  "advertised.listeners": |-
    EXTERNAL://你的宿主机IP:$((31090 + ${KAFKA_BROKER_ID}))
  "listener.security.protocol.map": |-
    PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
  • 修改zookeeper的配置文件 kafka/chart/zookeeper/value.yaml
service:
  type: NodePort  # Exposes zookeeper on a cluster-internal IP.
  • 新的部署:helm install kafka ./kafka --set external.enabled=true --set external.type=NodePort -n logging

  • 如果你只是更新:helm upgrade kafka ./kafka -n logging

  • 下面是官方提供的测试用例(可略过)

You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this:

  apiVersion: v1
  kind: Pod
  metadata:
    name: testclient
    namespace: logging
  spec:
    containers:
    - name: kafka
      image: confluentinc/cp-kafka:5.0.1
      command:
        - sh
        - -c
        - "exec tail -f /dev/null"

Once you have the testclient pod above running, you can list all kafka
topics with:

  kubectl -n logging exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --list

To create a new topic:

  kubectl -n logging exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1

To listen for messages on a topic:

  kubectl -n logging exec -ti testclient -- ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning

To stop the listener session above press: Ctrl+C

To start an interactive message producer session:
  kubectl -n logging exec -ti testclient -- ./bin/kafka-console-producer.sh --broker-list kafka-headless:9092 --topic test1

To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C

If you specify "zookeeper.connect" in configurationOverrides, please replace "kafka-zookeeper:2181" with the value of "zookeeper.connect", or you will get error.


### Connecting to Kafka from outside Kubernetes

You have enabled the external access feature of this chart.

**WARNING:** By default this feature allows Kafka clients outside Kubernetes to
connect to Kafka via NodePort(s) in `PLAINTEXT`.

Please see this chart's README.md for more details and guidance.

If you wish to connect to Kafka from outside please configure your external Kafka
clients to point at the following brokers. Please allow a few minutes for all
associated resources to become healthy.

  kafka.cluster.local:31090
  kafka.cluster.local:31091
  kafka.cluster.local:31092
  • 随便找一个宿主机进行测试

    • 下载后随便解压:wget https://apache.website-solution.net/kafka/2.7.0/kafka_2.12-2.7.0.tgz
    • cd切换到解压的bin目录中去
    • 生产消费者:./kafka-console-producer.sh --broker-list 宿主机ip:31090,宿主机ip:31091,宿主机ip:31092 --topic 你定义主题名称
    • 查看Topic的偏移:.\kafka-console-consumer.sh --bootstrap-server 宿主机ip:31092 --topic 你定义主题名称 --offset latest --partition 0
  • 生产者

生产者

  • 消费者

消费者

部署kafka-manage

  • 简单说就是kafka可视化界面
apiVersion: v1
kind: Service
metadata:
  name: kafka-manager
  namespace: logging
  labels:
    app: kafka-manager
spec:
  type: NodePort
  ports:
  - name: kafka
    port: 9000
    targetPort: 9000
    nodePort: 30900
  selector:
    app: kafka-manager
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-manager
  namespace: logging
  labels:
    app: kafka-manager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-manager
  template:
    metadata:
      labels:
        app: kafka-manager
    spec:
      containers:
      - name: kafka-manager
        image: zenko/kafka-manager:1.3.3.22
        imagePullPolicy: IfNotPresent
        ports:
        - name: kafka-manager
          containerPort: 9000
          protocol: TCP
        env:
        - name: ZK_HOSTS
          value: "kafka-zookeeper-headless:2181"
        livenessProbe:
          httpGet:
            path: /api/health
            port: kafka-manager
        readinessProbe:
          httpGet:
            path: /api/health
            port: kafka-manager
        resources:
          limits:
            cpu: 500m
            memory: 512Mi
          requests:
            cpu: 250m
            memory: 256Mi

部署Logstash

  • 采集临时存储中的Kafka的数据,而后输出到Elasticsearch去
kind: ConfigMap
apiVersion: v1
metadata:
  name: logstash-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  kafkaInput_consumer.conf: |-
    input {
        kafka {
            bootstrap_servers => ["kafka-headless:9092"]
            client_id => "fluentd"
            group_id => "fluentd"
            consumer_threads => 1
            auto_offset_reset => "latest"
            topics => ["messages"]
            codec => "json"
        }
    }
    filter {
      grok {
        match => ["message", "(?<timestamp>%{MONTHDAY}-%{MONTH}-%{YEAR} %{TIME}) queries: client %{IPV4:c_ip}#%{NUMBER:c_port}: query: %{NOTSPACE:queryrec} %{NOTSPACE:dnsclass} %{NOTSPACE:dnstype} \+ \(%{IPV4:dnsbind}\)"]
      }
      if [type] == "zeus-logger" {
         date {
                     match => ["timestamp", "UNIX_MS"]
              }
        }
    }
    output {
              elasticsearch{
                   hosts => ["elasticsearch:9200"]
                   index => "kubernetes_%{+YYYY_MM_dd}"     
              }
              if [type] == "zeus-logger" {
                    if [indexType] == "DAY" {
                            elasticsearch {
                      action => "index"
                      hosts => ["elasticsearch:9200"] # 请填写具体的es节点地址
                      index => "zeus-logger-%{[appName]}-%{+YYYY.MM.dd}"
                            }
                    } else if [indexType] == "MONTH" {
                            elasticsearch {
                      action => "index"
                      hosts => ["elasticsearch:9200"] # 请填写具体的es节点地址
                      index => "zeus-logger-%{[appName]}-%{+YYYY.MM}"
                            }
                    } else {
                            elasticsearch {
                              action => "index"
                      hosts => ["elasticsearch:9200"] # 请填写具体的es节点地址
                      index => "zeus-logger-%{[appName]}-%{+YYYY}"
                            }
                    }
            }
    }
  • 配置Deployment
kind: Deployment
apiVersion: apps/v1
metadata:
  labels:
    elastic-app: logstash
  name: logstash
  namespace: logging
spec:
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      elastic-app: logstash
  template:
    metadata:
      labels:
        elastic-app: logstash
    spec:
      containers:
        - name: logstash
          image: harbor.996a.com/public_images/logstash:7.11.1 
          volumeMounts:
          - mountPath: /usr/share/logstash/config
            name: config-volume
          command: ["/bin/sh","-c"]
          args: ["/usr/share/logstash/bin/logstash -f /usr/share/logstash/config/indexer-kafka-named-k8s.conf"]
      volumes:
        - name: config-volume
          configMap:
            name: logstash-config
            items:
            - key: kafkaInput_consumer.conf
              path: indexer-kafka-named-k8s.conf
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule

部署可视化界面

apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: logging
  labels:
    app: kibana
spec:
  ports:
  - port: 5601
  type: NodePort
  selector:
    app: kibana

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: logging
  labels:
    app: kibana
spec:
  selector:
    matchLabels:
      app: kibana
  template:
    metadata:
      labels:
        app: kibana
    spec:
      nodeSelector:
        es: log
      containers:
      - name: kibana
        image: docker.elastic.co/kibana/kibana:7.6.2
        resources:
          limits:
            cpu: 500m
          requests:
            cpu: 200m
        env:
        - name: ELASTICSEARCH_HOSTS
          value: http://elasticsearch:9200
        ports:
        - containerPort: 5601

发表评论