GCP : Create your Dataproc Cluster with Spark and Dataproc Metastore Hive

Mahendran
5 min readFeb 4, 2024

--

Google Cloud Dataproc is a fully managed cloud service provided by Google Cloud Platform (GCP) for running Apache Spark, Apache Hadoop, Apache Hive, Apache HBase, and other Big Data frameworks. It allows users to easily create, manage, and scale clusters of virtual machines to process large datasets.

Key Features

  1. Managed Service: Dataproc is a fully managed service, handles cluster provisioning, management, scaling, and maintenance tasks such as security updates and patches allowing users to focus on data processing tasks.
  2. Integration with GCP Services: Seamless integration with other GCP services such as BigQuery, Cloud Storage, Pub/Sub, and Dataflow, allowing users to build end-to-end data pipelines.
  3. Cost Efficiency: Offers flexible pricing models including pay-as-you-go and preemptible VMs to optimize costs.
  4. Scalability: Easily scale clusters based on workload demands with autoscaling features for resource optimization.
  5. Compatibility with Big Data Ecosystem: Dataproc supports popular Apache Big Data frameworks such as Spark, Hadoop, Hive, HBase, Pig, and more.
  6. Security: Provides robust security features including encryption, IAM integration, and compatibility with GCP security services such as Cloud KMS and Cloud Identity-Aware Proxy.
  7. Easy Integration with Development Tools: Seamless integration with development tools such as Jupyter Notebook, Apache Zeppelin, and Google Cloud SDK

Overall, Google Cloud Dataproc simplifies the process of running and managing big data processing workloads in the cloud, providing users with a scalable, cost-effective, and fully managed solution for their data processing needs.

Overview of Dataproc with Spark and Hive:

Using Apache Spark with a Hive metastore on Google Cloud Dataproc is a common practice for managing and querying structured data. The Hive metastore provides a centralized repository to store metadata for Hive tables, which allows different compute frameworks like Spark to access and query the data using SQL-like syntax.

  1. Create a Dataproc Cluster: Define cluster properties including number of nodes, machine types, hive properties and initialization actions. [see below json config]
  2. Prerequisites: Service Account with required roles and Dataproc Metastore service created.
  3. Enable Hive Metastore: Create a Dataproc Metastore with hive thrift and set — properties flag with hive:hive.metastore.uris=thrift://<HIVEMETASTORE_HOST>:9083 in Dataproc config json below.
  4. Configure Spark [optional]: You can achieve this by setting the appropriate configurations in the spark-defaults.conf file or through Spark configuration properties when submitting Spark jobs [I chose to do the later].
  5. Running Spark Jobs with Hive Support: Add config(“hive.metastore.warehouse.dir”, “gs://hive-warehouse-dir/hive-warehouse”) to spark config
  6. Monitoring and Maintenance: Regularly monitor cluster and Spark jobs using GCP monitoring tools. Perform maintenance tasks as needed.

Overall, integrating Spark with a Hive metastore on Dataproc allows you to leverage the power of both platforms for processing and querying large-scale datasets efficiently.

1. Create a Dataproc Cluster

1. 1 Prerequisites

  1. Service Account: A Service Account with required roles to create the cluster
  2. Add Permissions to the above role
  3. Dataproc Metastore (create a Dataproc Metastore service)
  4. hive:hive.metastore.uris: gcloud metastore services describe <METASTORE_ID> — location <location_id> — format=”get(endpointUri)”
  5. hive.metastore.warehouse.dir
{
"projectId":"<project-id>",
"clusterName":"dataproc-cluster-with-hive-metastore-01",
"labels":{
"application-name":"mahendran-data-proc-cluster-01-application-01",
"mailalias":"mahen-it",
"dataclassification":"internal",
"environment":"poc",
"resourceowner":"mahendran"
},
"config":{
"config_bucket":"data-proc-storage",
"gce_cluster_config":{
"metadata":{
"application":"mahendran-cluster-01-application-01-metadata"
},
"service_account":"<some-service-accont-sa>@<project-id>.iam.gserviceaccount.com",
"subnetwork_uri":"default",
"service_account_scopes":[
"https://www.googleapis.com/auth/cloud-platform"
]
},
"master_config":{
"num_instances":1,
"machine_type_uri":"n1-standard-4",
"disk_config":{
"boot_disk_type":"pd-standard",
"boot_disk_size_gb":30,
"num_local_ssds":0
},
"accelerators":[

]
},
"worker_config":{
"num_instances":2,
"machine_type_uri":"n1-standard-4",
"disk_config":{
"boot_disk_type":"pd-standard",
"boot_disk_size_gb":30,
"num_local_ssds":0
},
"accelerators":[

]
},
"software_config":{
"imageVersion":"2.0",
"optional_components":[
"PRESTO",
"ZEPPELIN"
],
"properties":{
"hive:hive.exec.dynamic.partition.mode":"nonstrict",
"hive:hive.exec.dynamic.partition":"true",
"hive:hive.support.quoted.identifiers":"none",
"hive:hive.server2.thrift.port":"10000",
"hive:hive.metastore.uris":"thrift://<endpoint-uri>:9083",
"hive:hive.metastore.warehouse.dir":"gs://hive-warehouse-dir/dataproc-metastore/hive-warehouse"
}
},
"endpoint_config":{
"http_ports":{

},
"enable_http_port_access":true
},
"lifecycleConfig":{
"c":"1200s"
},
}
}

1.2 Post the request

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @dataproc/dataproc-request-thrift.json \
"https://dataproc.googleapis.com/v1/projects/<project-id>/regions/us-west1/clusters"
RESPONSE

You can remove the Zone URI and let the cluster choose the Zone

"zone_uri":"https://www.googleapis.com/compute/v1/projects/data-proc-poc/zones/us-west1-a",

1.3 Verify the Cluster Status

Cluster Detail Page
Check the Worker and Master Nodes
Check the Components

Run Sample Apps

Now its time to run apps to compute. As always start with a Wordcount. :)

1. Submit a Wordcount spark Job

Note: You need to replace <some-service-account-sa> and input-data below:

  1. Copy theshakespeare-dataset to gs://input-data/wordcount/
  2. Clone the apache-spark-framework sample Apps
  3. Add spark.jars.packages=’com.google.cloud.bigdataoss:gcs-connector:3.0.0
 gcloud dataproc jobs submit spark --cluster=mahendran-data-proc-cluster-01 \
--impersonate-service-account=<some-service-account-sa<some-service-account-sa>@data-proc-poc.iam.gserviceaccount.com \
--project=data-proc-poc \
--region=us-west1 \
--jars gs://mahendran-data-proc-storage/spark-app-jar/apache-spark-framework-2.12.0-all.jar \
--class dev.template.spark.WordCount \
--properties=^#^spark.jars.packages='com.google.cloud.bigdataoss:gcs-connector:3.0.0',#"spark.submit.deployMode"="cluster" \
-- gs://input-data/wordcount/shakespeare-dataset/text gs://output-data/wordcount/WordCount_$(date +%F_%T)

2. Run a Hive Example with Public Covid Data

2. 1. Define buckets: Input, output and Hive Schema

gs://<GCS_BUKCET>/data/input/us-counties-recent.csv
gs://<GCS_BUKCET>/data/output/partitioned-covid-data_$(date +%F)
gs://<EXTERNAL_WAREHOUSE_DIR>

2.2. Copy the public Covid data to the input bucket to input bucket

curl -s https://raw.githubusercontent.com/mahen-github/covid-19-data/master/us-counties-recent.csv | gsutil cp - gs://<GCS_BUKCET>/data/input/us-counties-recent-2024.csv

2.3. Run the CovidDataHivePartitioner App

gcloud dataproc jobs submit spark --cluster=mahendran-data-proc-cluster-01 \
--impersonate-service-account=<some-service-account-sa>@data-proc-poc.iam.gserviceaccount.com \
--project=<SOME_PROJECT> \
--region=us-west1 \
--jars gs://mahendran-data-proc-storage/spark-app-jar/apache-spark-framework-2.12.0-all.jar \
--class dev.template.spark.CovidDataHivePartitioner \
--properties=^#^spark.jars.packages='com.google.cloud.bigdataoss:gcs-connector:3.0.0',#"spark.submit.deployMode"="cluster" \
-- gs://input-data/data/input/us-counties-recent.csv gs://output-data/data/output/partitioned-covid-data_$(date +%F) gs://<EXTERNAL_WAREHOUSE_DIR>/data/schema/poc

2.4. Verify the output

2.5. Open Zepplin and Execute the SQLs

%sql

show databases;

use public_data;

show tables;

describe table extended public_data.covid;

msck repair table public_data.covid;

select
reported_date,
county,
state,
fips,
cases,
deaths from public_data.covid;

2.6. Verify the Results

Conclusion:

In summary, Google Cloud Dataproc with Apache Spark and Hive on Dataproc Metastore, offers a robust solution for processing large datasets efficiently in the cloud. More examples to be followed and keep watching the apache-spark-framework repository.

References

  1. ClusterConfig
  2. DataProc — Spark Cluster on GCP in minutes
  3. BigQuery explained: An overview of BigQuery’s architecture
  4. creating-a-dataproc-cluster-considerations-gotchas-resources
  5. Dataproc cluster property(core, memory and memoryOverhead) setting
  6. Dataproc Supported machine types

--

--

Mahendran

A Software/Data Engineer, Photographer, Mentor, and Traveler