การประมวลผลข้อมูล

ในปัจจุบันนี้การใช้งานข้อมูลในธุรกิจต่าง ๆ เป็นเรื่องที่จำเป็นอย่างมาก การนำเอาข้อมูลมาใช้ และ การประมวลผลข้อมูล จะช่วยให้เรามีการตัดสินใจในการดำเนินธุรกิจได้อย่างมีประสิทธิภาพมากขึ้น และที่สำคัญคือ ถ้าธุรกิจไหนสามารถนำข้อมูลลูกค้า และที่ได้จากลูกค้า มาช่วยให้เข้าใจลูกค้ามากยิ่งขึ้น ตอบสนองความต้องการของลูกค้าได้ดีขึ้นแล้ว ธุรกิจนั้นจะมีพลังในการแข่งขันในตลาดสูงมากขึ้นอีกด้วย

ปัญหาที่พบเจอปัญหาหนึ่งคือว่า เนื่องจากว่าข้อมูลที่เราได้มาในปัจจุบันเริ่มมีมาจากหลายช่องทาง และมีขนาดใหญ่ขึ้นเรื่อย ๆ จนที่เราไม่สามารถที่จะนำมาคำนวณใส่ Excel หรือเขียน Code เพื่อประมวลผลข้อมูลบนเครื่องโน้ตบุ๊กที่เราใช้งานอยู่ได้แล้ว

เครื่องมือที่เป็นตัวเลือกแรก ๆ ที่มีความสามารถในการประมวลผลข้อมูลขนาดใหญ่ คือ Apache Spark นั่นเอง แต่ทีนี้การที่จะใช้งานกับข้อมูลที่มีขนาดใหญ่นั้น แน่นอนว่าเราไม่สามารถที่จะมารันบนเครื่องเราได้อีกแล้ว จำเป็นต้องไปนึกถึงการนำคอมพิวเตอร์หลาย ๆ เครื่องมาช่วยในการประมวลผล โดยส่วนใหญ่ระบบ Cloud จะเป็นตัวเลือกอันดับต้น ๆ เพราะช่วยให้เราสามารถสร้าง และจัดการ Cluster ของคอมพิวเตอร์ได้ภายในไม่กี่นาที โดยเราไม่ต้องมาปวดหัวกับการดูแลรักษาเครื่องคอมพิวเตอร์ต่าง ๆ ที่สร้างขึ้นมา สุดท้ายถ้าเราไม่ใช้งานแล้ว เราก็สามารถ Terminate ไปได้เลย ซึ่งจะช่วยประหยัดเงินไปอีก โดยในบทความนี้จะกล่าวถึงการใช้งานบริการของ AWS ที่ชื่อว่า Amazon EMR

การประมวลผลข้อมูล

อย่างไรก็ดี ขั้นตอนในการประมวลผลข้อมูลมักจะเป็นขั้นตอนที่เราต้องทำซ้ำ ๆ และทำอยู่เรื่อย ๆ เป็นประจำ ดังนั้นการที่เราสามารถสร้าง Workflow ให้การประมวลผลข้อมูลนั้นทำงานได้โดยอัตโนมัติได้ ซึ่งตรงนี้ก็จะสามารถมาช่วยให้เราไม่ต้องไปเสียเวลา หรือไปลงแรงในการทำงานซ้ำ ๆ เราสามารถที่จะนำเอาเวลาทำงานของเราไปเน้นที่การวิเคราะห์ข้อมูล และช่วยตอบโจทย์ต่าง ๆ ทางธุรกิจได้

ดังนั้นในบทความนี้จะมานำเสนอวิธีการสร้าง Automated Workflow ที่จะส่ง Spark Job ขึ้นไปรันบน Amazon EMR Cluster โดยใช้ Apache Airflow ครับ ซึ่งคิดว่าแนวคิดนี้จะสามารถเอาไปประยุกต์ใช้กับงานประมวลผลข้อมูลขนาดใหญ่ด้านอื่น ๆ ได้อีกด้วย

ภาพรวมของ Automated Workflow หรือ Airflow DAG (Directed Acyclic Graph) ของเราจะประกอบไปด้วย 4 ขั้นตอนดังต่อไปนี้

  1. สร้าง EMR Cluster ขึ้นมาใช้งาน
  2. ส่ง Spark Job เข้าไปรันใน EMR Cluster
  3. คอยตรวจสอบว่า Spark Job ที่เราส่งไปทำงานเสร็จหรือยัง
  4. เมื่อ Spark Job ทำงานเสร็จเรียบร้อยแล้ว เราจะสั่ง Terminate ตัว EMR Cluster ที่เราสร้างขึ้นมา

ก่อนเริ่มเราจำเป็นต้องเตรียมสิ่งต่าง ๆ ตามนี้ก่อนครับ

  1. AWS Account
  2. Apache Airflow
  3. [Optional] Awscli

เริ่มต้นเราจะสร้างสิ่งที่เรียกว่า Service Role เตรียมไว้ก่อน เพื่อที่จะให้ Amazon EMR สามารถจัดการ Resource ต่าง ๆ ได้เอง เราสามารถทำตาม Service role for Amazon EMR (EMR role) และ Service role for cluster EC2 instances (EC2 instance profile) หรือจะใช้ Awscli สร้างก็ได้เช่นเดียวกัน โดยรันคำสั่งตามนี้ครับ

aws emr create-default-roles

เสร็จแล้วเราจะสร้าง S3 Bucket เพื่อเก็บ Code ที่เราจะเรียกใช้งาน Spark ไว้ ซึ่งเราจะบอกให้ EMR มาหยิบโค้ดชุดนี้ไปรันใน Cluster ซึ่งเราสามารถเข้าไปสร้าง S3 Bucket ได้ผ่านทาง AWS Console หรือว่าจะใช้คำสั่งของ Awscli ตามด้านล่างนี้

aws s3api create-bucket --bucket my-spark-scripts --region ap-southeast-1

ในบทความนี้ เราจะตั้งชื่อ S3 Bucket ว่า my-spark-scripts ไว้ที่ Region ชื่อ ap-southeast-1 หรือที่ Singapore นั่นเอง (ชื่อ S3 Bucket อาจจะซ้ำกับชื่อของคนอื่นได้)

ในส่วน Code ที่เราจะเตรียมไว้ประมวลผลข้อมูล เราจะเขียน Spark Job ขึ้นมา และเพื่อให้ผู้อ่านได้เห็นภาพก่อนที่จะไปลองดึงข้อมูลจริง ๆ โดยเราจะสร้าง Spark App แบบง่าย ๆ ขึ้นมาเพื่อคำนวณหาเลขคี่ และแสดงผลออกมา 5 ตัวเลข เราจะเขียนไว้ที่ไฟล์ get_odds_numbers.py และมี Code ตามนี้

from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Sample Spark App").getOrCreate()
sc = spark.sparkContext

big_list = range(10000)
rdd = sc.parallelize(big_list)

odds_numbers = rdd.filter(lambda x: x % 2 != 0)

print(odds_numbers.take(5))

เสร็จแล้วก็ให้เราอัพโหลดสคริปของเราไปไว้ที่ S3 Bucket ที่ชื่อ my-sparks-scripts ที่เราสร้างไว้ก่อนหน้านี้ครับ เพราะว่าเดี๋ยวเราจะให้เครื่องใน EMR Cluster มาหยิบไฟล์นี้ไปส่งงานให้กับ Spark

ถัดไปเราจะมาเริ่มแตะในส่วนของ Airflow กัน ซึ่งการที่จะให้ Airflow เชื่อมต่อไปยัง EMR ได้ เราจำเป็นต้องมี Service Account ก่อนครับ ดังนั้นเราจะไปสร้าง AWS User ขึ้นมา โดยให้เราไปที่ AWS Account ของเรา และตอนที่สร้าง User ให้เราเลือก Credential Type เป็น “Access Key – Programmatic Access”

สำหรับบทความนี้จะให้สิทธิ์กับ User เป็นแบบ Administrator Access เลยนะครับ เพื่อความง่ายสำหรับการใช้งานในบทความนี้เท่านั้นนะครับ ซึ่งในการใช้งานจริง ๆ การให้สิทธิ์แบบนี้เป็นสิ่งที่ไม่ควรทำ เราสามารถอ่านเพิ่มเติมเกี่ยวกับการกำหนดสิทธิ์ของ User ได้ที่ Access Management for AWS Resources

หลังจากที่เราสร้างเสร็จ เราจะได้ Access Key ID กับ Secret Access Key มาตามรูปด้านล่าง ให้เราคัดลอกเก็บไว้

ถัดไปให้เราเข้าไปที่ Airflow และสร้าง Airflow Connection ขึ้นมา โดยตั้งชื่อ Connection Id ว่า aws_conn เลือก Connection Type เป็น Amazon Web Services ตรงช่อง Login ให้ใส่ Access Key ID และช่อง Password ให้ใส่ Secret Access Key ที่เราได้มาในขั้นตอนก่อนหน้านี้ และตรง Extra ให้กำหนด Region ลงไปด้วยตามนี้

{"region_name": "ap-southeast-1"}

เราจะสร้างอีก 1 Airflow Connection โดยเลือก Connection Type เป็น Amazon Elastic MapReduce ส่วนค่าต่าง ๆ ให้ปล่อยว่างไว้ เพราะเราไม่ได้ต้องการที่จะ Override ค่า Setting อะไรของ EMR

โดยเราจะใช้ 2 Connections ที่เราสร้างขึ้นมานี้แหละในการเชื่อมต่อกับ AWS และ Amazon EMR

ทีนี้ก็มาถึงเวลาที่เราจะเขียนโค้ดสร้าง Workflow ของเราแล้ว ให้สร้างไฟล์ชื่อว่า submit_spark_job_to_emr_cluster_workflow.py และใช้โค้ดด้านล่างนี้

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrAddStepsOperator,
    EmrCreateJobFlowOperator,
    EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils import timezone
AWS_CONN_ID = "aws_conn"
EMR_CONN_ID = "emr_conn"
BUCKET_NAME = "my-spark-scripts"
SCRIPT = "get_odds_numbers.py"
JOB_FLOW_OVERRIDES = {
    "Name": "Getting Odds Numbers Processing",
    "ReleaseLabel": "emr-5.34.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"},
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT",
                "InstanceRole": "CORE",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}
SPARK_STEPS = [
    {
        "Name": "Get odds numbers from a big list",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.bucket_name }}/{{ params.script }}",
            ],
        },
    },
]
LAST_STEP = len(SPARK_STEPS) - 1
default_args = {
    "owner": "Kan Ouivirach",
    "start_date": timezone.datetime(2022, 5, 4),
}
with DAG(
    "submit_spark_job_to_emr_cluster_workflow",
    default_args=default_args,
    schedule_interval=None,
) as dag:
    create_emr_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id=AWS_CONN_ID,
        emr_conn_id=EMR_CONN_ID,
    )
    step_adder = EmrAddStepsOperator(
        task_id="step_adder",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id=AWS_CONN_ID,
        steps=SPARK_STEPS,
        params={
            "bucket_name": BUCKET_NAME,
            "script": SCRIPT,
        },
    )
    step_checker = EmrStepSensor(
        task_id="step_checker",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='step_adder', key='return_value')["
        + str(LAST_STEP)
        + "] }}",
        aws_conn_id=AWS_CONN_ID,
    )
    terminate_emr_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id=AWS_CONN_ID,
    )
    create_emr_cluster >> step_adder >> step_checker >> terminate_emr_cluster

มาลองไล่ดูโค้ดทีละส่วนกัน เริ่มด้วยที่ Airflow Task แรก create_emr_cluster โดยใช้ Airflow Operator ที่ชื่อว่า EmrCreateJobFlowOperator ตรงนี้เราจะสร้าง Job Flow ของ EMR ไว้ โดยกำหนด EMR เวอร์ชัน และ Applications ที่เราจะใช้ ในทีนี้คือ Hadoop กับ Spark ครับ และเราก็ตั้ง Configuration ไว้ว่าเราจะใช้ Python 3 นะ และในส่วนของ Instances นั้นเราจะมี

  • เครื่อง Master Node ขนาด m4.xlarge เป็น Spot Instance จำนวน 1 เครื่อง
  • เครื่อง Worker Node ขนาด m4.xlarge เป็น Spot Instance เช่นเดียวกัน จำนวน 2 เครื่อง

ตรง JobFlowRole และ ServiceRole เราก็ใช้ชื่อตาม Role ที่เราสร้างไว้ในช่วงแรกของบทความ

JOB_FLOW_OVERRIDES = {
    "Name": "Getting Odds Numbers Processing",
    "ReleaseLabel": "emr-5.34.0",
    "Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
    "Configurations": [
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"},
                }
            ],
        }
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "SPOT",
                "InstanceRole": "MASTER",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core - 2",
                "Market": "SPOT",
                "InstanceRole": "CORE",
                "InstanceType": "m4.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}
with DAG(...) as dag:

    create_emr_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id=AWS_CONN_ID,
    )

หลังจากจบ Task นี้ เราจะได้ EMR Cluster มาครับ

ถัดมาจะเป็น Airflow Task ที่จะเป็นตัวไปสร้างขั้นตอนที่เราอยากจะรันที่ EMR Job โดยเราจะใช้ Airflow Operator ที่ชื่อว่า EmrAddStepsOperator ซึ่งเราจะกำหนดขั้นตอนต่าง ๆ ในตัวแปร SPARK_STEPS ตรงนี้เราสามารถที่จะเพิ่มขั้นตอนไปสั่งให้คัดลอกไฟล์จาก S3 ไปวางไว้ที่ HDFS ก่อน แล้วค่อยส่ง Spark Job เพื่อให้ Spark ไปอ่านไฟล์ที่ HDFS ก่อนได้ หรือจะมีขั้นตอนที่เราคัดลอกไฟล์จาก HDFS ไปวางไว้ที่ S3 เพื่อเตรียมที่จะแชร์ให้กับทีมอื่น ๆ ก็ได้

ในทีนี้ เราจะมีแค่ขั้นตอนเดียวครับ คือการส่ง Spark Job ไปรัน Code ที่เราเขียนไว้ที่เราวางไว้ที่ S3

SPARK_STEPS = [
    {
        "Name": "Get odds numbers from a big list",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.bucket_name }}/{{ params.script }}",
            ],
        },
    },
]
with DAG(...) as dag:
		...
    step_adder = EmrAddStepsOperator(
        task_id="step_adder",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id=AWS_CONN_ID,
        steps=SPARK_STEPS,
        params={
            "bucket_name": BUCKET_NAME,
            "script": SCRIPT,
        },
    )

และแน่นอนว่าเราส่ง Spark Job ไปรันแล้ว เรายังจะ Terminate Cluster ของเราทันทีเลยไม่ได้ครับ ไม่งั้นเราอาจจะไม่ได้ผลลัพธ์อะไรเลย เพราะ Job ที่เราส่งไปยังรันไม่เสร็จ ดังนั้นเราจะต้องมี Task หนึ่งที่คอยตรวจสอบอยู่เป็นระยะ ๆ ว่างานที่ส่งไปเสร็จหรือยัง เราจะใช้ Airflow Operator ที่ชื่อว่า EmrStepSensor มาทำหน้าที่ในส่วนนี้

LAST_STEP = len(SPARK_STEPS) - 1

with DAG(...) as dag:

		...

    step_checker = EmrStepSensor(
        task_id="step_checker",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='step_adder', key='return_value')["
        + str(LAST_STEP)
        + "] }}",
        aws_conn_id=AWS_CONN_ID,
    )

ในโค้ดข้างต้นจะเห็นได้ว่าเราเช็กที่ LAST_STEP ครับ เพราะว่าปกติแล้วใน SPARK_STEPS เราอาจจะมีหลาย ๆ ขั้นตอนก็ได้ ซึ่งเราจะคอยตรวจสอบที่ขั้นตอนสุดท้ายก็พอในกรณีนี้ ถ้าใครต้องการที่จะตรวจสอบมากกว่า 1 ขั้นตอน ก็สามารถทำได้ครับ โดยเราสร้าง Task เพิ่มมาเพื่อตรวจสอบ และกำหนด Index ให้ตรงกับขั้นตอนที่เราต้องการตรวจสอบใน SPARK_STEPS ก็พอ

ขั้นตอนสุดท้าย เราก็แค่สั่ง Terminate ครับ โดยใช้ Airflow Operator ที่ชื่อว่า EmrTerminateJobFlowOperator

with DAG(...) as dag:
		
		...

    terminate_emr_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id=AWS_CONN_ID,
    )

หลังจากที่เรากำหนด Airflow task ต่าง ๆ เรียบร้อยแล้ว เราก็เอา Task เหล่านั้นมากำหนด Dependency ต่อ

create_emr_cluster >> step_adder >> step_checker >> terminate_emr_cluster

สุดท้ายแล้วเราจะได้หน้าตา Workflow หรือว่า DAG ของเราตามนี้

ให้เราลอง Enable ตัว Workflow ดูครับ แล้วกดปุ่ม Play หรือ Trigger ทางด้านขวามือของรูป

ถ้าเรากลับไปดูที่ AWS Account ของเรา แล้วเข้าไปที่ Amazon EMR เราจะเห็นว่ามี Cluster ที่ชื่อ “Getting Odds Numbers Processing” ได้ถูกสร้างขึ้นมาแล้ว!

ขั้นตอนต่าง ๆ ก็ถูกรันโดยอัตโนมัติ

มีงานที่เราส่งไปก็ถูกรัน แล้วปรับสถานะเป็น Completed เรียบร้อย

เราสามารถเข้าไปตรวจสอบ Spark Job ของเราได้ที่ Spark History Server ครับ โดยกดที่ Link ตามรูปด้านล่างนี้

พอกดแล้วจะมีหน้าเว็บเด้งขึ้นมาอีกหน้าหนึ่ง ซึ่งเป็นหน้าเว็บของ Spark UI ครับ เราสามารถกดเข้ามาดูรายละเอียดของ Job ต่าง ๆ ที่เราส่งมาได้

ทีนี้ถ้าเราย้อนกลับมาดูที่ Airflow บ้าง จะเห็นว่าเรารันครบทุก Task แล้วไม่มีปัญหา

หมายความว่าสุดท้ายแล้ว EMR Cluster ที่เราสร้างไว้ จะถูก Terminate ด้วย

ซึ่งถ้าเราย้อนกลับมาดูที่ Amazon EMR ของเราก็จะเห็นว่า Cluster ได้ถูก Terminate ไปเป็นที่เรียบร้อยแล้ว

ถ้าผู้อ่านอยากจะต่อยอดตัว Automated Workflow นี้ให้ไปประมวลผลข้อมูลขนาดใหญ่ วิธีหนึ่งก็สามารถทำได้คือ เราอาจจะเก็บข้อมูลเหล่านั้นไว้ที่ S3 ก่อน เสร็จแล้วที่ SPARK_STEPS เราสามารถเขียนประมาณนี้ได้

SPARK_STEPS = [
    {
        "Name": "Move raw data from S3 to HDFS",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src=s3://{{ params.BUCKET_NAME }}/{{ params.s3_data }}",
                "--dest={{ params.hdfs_data }}",
            ],
        },
    },
    {
        "Name": "Process data",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode",
                "client",
                "s3://{{ params.BUCKET_NAME }}/{{ params.s3_script }}",
            ],
        },
    },
    {
        "Name": "Move cleaned data from HDFS to S3",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "s3-dist-cp",
                "--src={{ params.hdfs_output }}",
                "--dest=s3://{{ params.BUCKET_NAME }}/{{ params.s3_cleansed }}",
            ],
        },
    },
]

โดยขั้นตอนแรก จะเป็นการก็อบปี้ไฟล์จาก S3 ไปวางไว้ที่ HDFS ก่อน ใช้คำสั่ง s3-dist-sp เสร็จแล้วค่อยส่ง Spark Job และสุดท้ายก็คัดลอกผลลัพธ์จาก HDFS ไปวางไว้ที่ S3

โดยสรุปแล้ว บทความนี้ได้กล่าวถึงการสร้าง Automated Workflow บน Airflow เพื่อสร้าง Amazon EMR Cluster ขึ้นมารัน Spark Job และเมื่อรันเสร็จแล้ว EMR Cluster ที่ถูกสร้างขึ้นจะถูก Terminate โดยอัตโนมัติ หวังว่าบทความนี้จะช่วยให้ได้เห็นภาพมากขึ้น และสามารถเอาไปประยุกต์ใช้งานในธุรกิจของแต่ละคนได้ครับ

เนื้อหาโดย กานต์ อุ่ยวิรัช
ตรวจทานและปรับปรุงโดย นววิทย์ พงศ์อนันต์

Recommended Posts