Điều phối Data Fusion pipelines dễ dàng với Cloud Composer

Tháng Sáu 4, 2021
Nga Pham

Thế giới phân tích dữ liệu (data analytics) dựa vào các đường ống (pipeline) ETL và ELT để thu thập những thông tin chi tiết có ý nghĩa từ dữ liệu. Các kỹ sư dữ liệu và nhà phát triển ETL thường được yêu cầu xây dựng hàng chục đường ống phụ thuộc lẫn nhau như một phần của nền tảng dữ liệu của họ, tuy nhiên việc điều phối, quản lý và giám sát tất cả các đường ống này có thể là một thách thức lớn. Nhưng giờ đây, công việc này có thể trở nên dễ dàng hơn trong Cloud Composer bằng cách sử dụng một tập hợp các toán tử của Cloud Data Fusion.

Các toán tử mới của Cloud Data Fusion cho phép bạn dễ dàng quản lý các đường ống trong Cloud Composer mà không cần phải viết nhiều dòng lệnh. Giờ đây, bạn có thể triển khai, bắt đầu hoặc dừng các đường ống của mình bằng cách sử dụng các toán tử cùng một vài tham số. Điều này giúp bạn tiết kiệm thời gian nhưng vẫn đảm bảo độ chính xác và tính hiệu quả trong quy trình làm việc.

Quản lý đường ống dữ liệu của bạn

Data Fusion là dịch vụ tích hợp dữ liệu đám mây được quản lý hoàn toàn bởi Google Cloud và được xây dựng trên nền tảng mã nguồn mở CDAP, giúp người dùng xây dựng và quản lý các đường ống dữ liệu ETL và ELT thông quan một giao diện đồ họa trực quan. Bằng cách loại bỏ các rào cản mã hóa, các nhà phân tích dữ liệu và người dùng doanh nghiệp đều có thể tham gia cùng các nhà phát triển để quản lý dữ liệu của họ.

Quản lý tất cả các đường ống Data Fusion của bạn có thể là một thách thức. Lấy ví dụ, việc xác định cách thức và thời điểm kích hoạt các đường ống của bạn có thể không đơn giản như tưởng tượng. Trong một số trường hợp, bạn có thể muốn lập lịch chạy đường ống theo định kỳ, nhưng bạn sẽ nhanh chóng nhận ra rằng công việc này có sự phụ thuộc và các hệ thống, quy trình và đường ống khác.

Bạn có thể nhận thấy rằng bạn thường cần phải chờ đợi để chạy đường ống của mình cho đến khi một số điều kiện khác được thỏa mãn, chẳng hạn như nhận được thông điệp Pub/Sub, dữ liệu đến trong một bucket, hoặc trong một nhóm các đường ống mà đường ống này phụ thuộc vào dữ liệu đầu ra của các đường ống khác. Đó chính là khi Cloud Composer trở nên hữu ích.

Cloud Composer được xây dựng trên mã nguồn mở Apache Airflow, là dịch vụ điều phối được quản lý hoàn toàn bởi Google, cho phép người dùng quản lý các đường ống trong toàn bộ nền tảng dữ liệu của mình. Quy trình công việc của Cloud Composer được định cấu hình bằng cách xây dựng các đồ thị vòng có hướng (directed acyclic graph – DAG) bằng Python. Trong khi DAG mô tả tập hợp các nhiệm vụ trong một quy trình công việc nhất định, thì chính các toán tử sẽ xác định những gì được hoàn thiện sau một tác vụ. Bạn có thể xem các toán tử như một mẫu và các toán tử Data Fusion mới này cho phép bạn dễ dàng triển khai, bắt đầu và dừng các đường ống ETL/ELT Data Fusion của mình đơn giản bằng cách cung cấp một vài tham số.

Xem xét một trường hợp cụ thể khi Composer kích hoạt đường dẫn Data Fusion đúng một lần khi có tập tin gửi đến trong Cloud Storage bucket:

1 Data Fusion pipeline.jpg

Các bước làm trên được thực hiện như một chuỗi tác vụ trong Composer. Khi một toán tử được khởi tạo, nó sẽ trở thành tác vụ đơn (single task) trong quy trình làm việc, sau đó toán tử CloudDataFusionStartPipeline được sử dụng để bắt đầu Data Fusion pipeline.

Các toán tử này giúp đơn giản hóa DAG. Thay vì yêu cầu mã nguồn bằng ngôn ngữ Python để gọi API Data Fusion hoặc CDAP, Google Cloud cung cấp toán tử với những chi tiết về pipeline, làm giảm độ phức tạp và cải thiện độ tin cậy trong quy trình làm việc của Cloud Composer.

Điều phối các đường ống

Việc điều phối các đường ống với những toán tử này sẽ hoạt động như thế nào trong thực tế? Dưới đây là một ví dụ về cách bắt đầu một đường ống. các nguyên tắc ở đây có thể dễ dàng được mở rộng để bắt đầu, dừng và triển khai tất cả các đường ốn Data Fusion từ Cloud Composer.

Giả sử có một đối tượng Data Fusion với một đường dẫn đã được triển khai và sẵn sàng hoạt động và bạn cần tạo ra một quy trình Composer để kiểm tra sự tồn tại của một tập tin trong Cloud Storage bucket. Trong tương lai, Google Cloud sẽ thêm một trong các toán tử Data Fusion mới vào Cloud Composer DAG để có thể kích hoạt đường dẫn khi tập tin tới, chuyển vào một tên tập tin mới dưới dạng đối số (runtime argument).

Bây giờ, bạn có thể bắt đầu quy trình Cloud Composer và xem cách thức nó hoạt động.

1. Kiểm tra sự tồn tại của đối tượng trong Cloud Storage bucket

Thêm cảm biến GCSObjectExistenceSensor vào DAG. Sau khi bắt đầu tác vụ này, nó sẽ chờ đợi một đối tượng được tải lên Cloud Storage.

gcs_sensor_task = GCSObjectExistenceSensor(
 task_id="gcs_object_sensor",
    bucket='my_data_bucket',
    object='my_data_01082020.csv',
    dag=dag
)

2. Khởi động Data Fusion pipeline

Sử dụng toán tử CloudDataFusionStartPipelineOperator để bắt đầu một đường dẫn đã được triển khai trong Data Fusion. Tác vụ này được coi là hoàn thành sau khi đường dẫn được khởi động thành công trong Data Fusion.

start_pipeline_task = CloudDataFusionStartPipelineOperator(
 task_id="start_cdf_pipeline",
 location='us-west1',
 pipeline_name =’demo_pipeline',
 instance_name="demo_instance",
 runtime_args={'input_dir':'my_data_01082020.csv'},
 dag=dag
)

Bạn có thể xem thêm airflow documentation để hiểu thêm chi tiết về các tham số cần thiết trong toán tử này.

3. Sắp xếp thứ tự của luồng tác vụ bằng toán tử dịch chuyển bit

Khi DAG này được khởi động, tác vụ gcs_sensor sẽ thực thi đầu tiên. Chỉ khi tác vụ này hoàn tất thì tác vụ start_pipeline mới thực thi.

gcs_sensor_task >> start_pipeline_task

4. Tải DAG của bạn lên Cloud Composer DAG bucket và bắt đầu quy trình làm việc

Bây giờ khi DAG của bạn đã hoàn thiện, hãy nhấp vào liên kết tới thư mục DAGs từ trang đích của Cloud Composer và tải DAG của bạn lên.

2 complete dag.jpg

Nhấp vào liên kết máy chủ Airflow web để khởi động giao diện người dùng Airflow và sau đó kích hoạt DAG bằng cách ấn nút Run.

3 trigger the DAG.jpg

5. Các tác vụ đã được thực thi

Khi tập tin được tải lên bucket nguồn của Google Cloud, Data Fusion sẽ được kích hoạt.

4 source bucket.jpg

Vận hành và điều phối

Giờ đây, khi bạn không còn phải viết các dòng lệnh Python và duy trì các test gọi API Data Fusion, bạn sẽ có nhiều thời gian hơn để tập trung vào những phần khác trong quy trình làm việc của mình. Các toán tử Data Fusion này là một bổ sung tuyệt vời cho bộ các toán tử đã có sẵn trên Google Cloud. Cloud Composer và Airflow cũng hỗ trợ các toán tử cho BigQuery, Cloud Dataflow, Cloud Dataproc, Cloud Datastore và Cloud Pub/Sub, mang đến những tích hợp tuyệt vời hơn trên toàn bộ nền tảng dữ liệu của bạn.

Sử dụng các toán tử Data Fusion mới là một cách thức minh bạch để mang lại DAG đơn giản và dễ đọc trong Cloud Composer. Bằng cách giảm độ phức tạp và loại bỏ các rào cản mã hóa, việc quản lý đường ống ETL và ELT trở nên dễ tiếp cận hơn với các thành viên trong tổ chức của bạn. Bạn có thể xem thêm Airflow documentation để tìm hiểu kỹ hơn về các toán tử mới này.

Nguồn: Google Cloud Blog