2024-01-30 | Article | Insights
Learn how to optimize your BigQuery schema for free and unlock impressive performance gains in this comprehensive guide. With easy-to-follow code and good tips, Google Cloud now offers a cost-effective method to improve your BigQuery experience. No matter where you are on your data journey, this approach will help tune your schema for optimal data processing and storage.
In the official BigQuery documentation, Google offers various ways to modify your table schema. These options include adding a column, renaming it, changing the data type and even the column’s mode. However, those built-in solutions do not always cover your business needs and sometimes can lead to an expensive invoice at the end of the month.
Current solutions to change a column’s data type involve coercion or casting. The first solution is limited in terms of the coercion possibilities while the second can be costly if the table is huge as BigQuery is forced to read the entire table at once and overwrite it.
To dig deeper into the first potential solution, the available coercion possibilities currently are
Thus, if you simply need to convert a STRING to INT64, these existing options are not suitable.
It is well known that in BigQuery the Load, Copy, Export and Delete tables operations are free. This can be found on the BigQuery Pricing page. Taking that into consideration, Digitl Cloud has defined a simple data pipeline in Python that performs a combination of them to replace the table schema with the desired one.
Essentially, from a BigQuery Table, you can perform 3 operations:
There are different ways to generate the desired JSON schema. However, to keep this guide simple, emphasis will be placed on the most straightforward approach. For that, go to the original table in BigQuery, and inside the Schema section, select all the fields and click on ‘Copy as JSON’
The file will look like this:
[
{
"name": "id",
"mode": "NULLABLE",
"type": "STRING",
"description": null,
"fields": []
},
{
"name": "date",
"mode": "NULLABLE",
"type": "DATE",
"description": null,
"fields": []
},
{
"name": "total",
"mode": "NULLABLE",
"type": "STRING",
"description": null,
"fields": []
}
]
Once you save this file locally, it becomes a straightforward task to change the type of the specific/problematic field. You can refer to the official documentation where all the BigQuery data types are defined here to understand the necessary type.
It's time to put theory into action. The first thing that needs to be done is the creation of a Google Cloud Storage bucket. We recommend configuring the object lifecycle to delete the files older than 1 day to avoid storage costs. More information can be found here.
With the bucket created, we can proceed executing the code that is at the end of this section. To execute it, we need to pass some parameters
An example of the code usage is:
python main.py \
--project_id=my_project_id \
--dataset_id=dataset \
--table_name=demo \
--json_file=file.json \
--bucket_name=my_bucket
Full code:
import argparse
import sys
import json
from google.cloud import bigquery
from google.cloud.bigquery import Client
SUFFIX = "_bkp"
def load_json_data(json_file_path):
with open(json_file_path, "r") as file:
json_data = json.load(file)
return json_data
def copy_table(
bigquery_client: Client,
project_id: str,
dataset_id: str,
table_name: str,
):
source_table_id = f"{project_id}.{dataset_id}.{table_name}"
destination_table_id = f"{project_id}.{dataset_id}.{table_name}{SUFFIX}"
job = bigquery_client.copy_table(source_table_id, destination_table_id)
job.result()
def delete_table(
bigquery_client: Client,
project_id: str,
dataset_id: str,
table_name: str,
):
table_id = f"{project_id}.{dataset_id}.{table_name}"
bigquery_client.delete_table(table=table_id)
print("Deleted table '{}'.".format(table_id))
def export_table_to_json_file(
bigquery_client: Client,
project_id: str,
dataset_id: str,
table_name: str,
bucket_name: str,
gcs_location: str = "EU",
) -> None:
file_id = f"{project_id}.{dataset_id}.{table_name}.json"
destination_uri = "gs://{}/{}".format(bucket_name, file_id)
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id=table_name)
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON
extract_job = bigquery_client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
location=gcs_location,
)
extract_job.result()
print("extract_job", extract_job)
def get_table_schema_def(
bigquery_client: Client,
project_id: str,
dataset_id: str,
table_name: str,
) -> None:
table_id = f"{project_id}.{dataset_id}.{table_name}"
table = bigquery_client.get_table(table_id)
json_schema_list = [f.to_api_repr() for f in table.schema]
print(f"Got table '{table.project}.{table.dataset_id}.{table.table_id}'.")
return json_schema_list
def load_table_uri_json(
bigquery_client: Client,
project_id: str,
dataset_id: str,
table_name: str,
schema: list,
bucket_name: str,
gcs_location: str,
) -> None:
table_id = f"{project_id}.{dataset_id}.{table_name}"
original_table = bigquery_client.get_table(table_id + SUFFIX)
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
time_partitioning=original_table.time_partitioning,
)
file_id = f"{table_id}.json"
uri = "gs://{}/{}".format(bucket_name, file_id)
load_job = bigquery_client.load_table_from_uri(
uri,
table_id,
location=gcs_location,
job_config=job_config,
)
load_job.result()
destination_table = bigquery_client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Process data with provided parameters"
)
parser.add_argument("--project_id", help="Project ID")
parser.add_argument("--dataset_id", help="Dataset ID")
parser.add_argument("--table_name", help="Table Name")
parser.add_argument("--json_file", help="JSON File")
parser.add_argument("--bucket_name", help="GCS Bucket Name (without gs://)")
args = parser.parse_args()
if not all(
[
args.project_id,
args.dataset_id,
args.table_name,
args.json_file,
args.bucket_name,
]
):
print(
"Usage: $ poetry run python main.py \
--project_id= \
--dataset_id= \
--table_name= \
--json_file= \
--bucket_name="
)
sys.exit(1)
new_table_schema = load_json_data(json_file_path=args.json_file)
bigquery_client = bigquery.Client()
export_table_to_json_file(
bigquery_client=bigquery_client,
project_id=args.project_id,
dataset_id=args.dataset_id,
table_name=args.table_name,
bucket_name=args.bucket_name,
)
copy_table(
bigquery_client=bigquery_client,
project_id=args.project_id,
dataset_id=args.dataset_id,
table_name=args.table_name,
)
delete_table(
bigquery_client=bigquery_client,
project_id=args.project_id,
dataset_id=args.dataset_id,
table_name=args.table_name,
)
load_table_uri_json(
bigquery_client=bigquery_client,
project_id=args.project_id,
dataset_id=args.dataset_id,
table_name=args.table_name,
schema=new_table_schema,
bucket_name=args.bucket_name,
gcs_location="EU",
)
At the beginning of the code, there are some helper functions to make the script more readable. Let’s skip that part and focus only on the code.
Here's a breakdown of the script's main components:
Overall, this script automates the process of changing a BigQuery schema. It achieves this by exporting the table, creating a copy, updating the schema, and then removing the original table. It streamlines this process by taking the necessary details as command-line arguments.