Blog
>
Technology

Transfer Data from BigQuery to MongoDB

Hussain Poonawala
I
January 30, 2021
I

In this article, I’ll introduce the problem, the approaches thought off, and the one which we decided to use.

Problem: We wanted to build a recommendation engine but as the calculation of scores would be computationally heavy (MapReduce and Aggregation queries in realtime would not be of much help), we wanted to use the capabilities of bigquery and keep the scores/recommendations ready on the DB side. We could then update the recommendations every day by running a job.

For this we came up with 3 approaches:

  1. Retrieve the data from the bigquery and export the data to the CSV file(s) to a common location. A scheduler job process those files going through the ETL process and persist them into a MongoDB collection.
  2. Create an AirFlow job that does the extraction of the data from the bigquery, transforms the data as per the requirement, and loads them to the MongoDB collection.
  3. Last but not least, using the bigquery client SDK, retrieve the required data from BigQuery, transform and insert the data into MongoDB.

We decided to go ahead with the 3rd approach.

Step 1: Ready the query to get the required data from Bigquery
For this blog, I am using a sample dataset and retrieving all the data from the table and will insert this into MongoDB.

Step 2: Initial Set Up to use the client library
Follow all the steps mentioned in the Before you begin section.

Step 3: Install the client library
We will be using maven, so we need to import the following dependencies in the pom:

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>16.2.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
</dependencies>

Step 4: Complete source code to retrieve data from bigquery

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

   // table ref to insert
   TableId tableId = TableId.of("project-id", "dataset-name", "food_events");

   // Query config
   QueryJobConfiguration queryConfig = QueryJobConfiguration
       // Query to run
       .newBuilder("SELECT * FROM `bigquery-public-data.fda_food.food_events`")
       // Sets the table where to put query results. If not provided a new table is created.
       .setDestinationTable(tableId).setUseLegacySql(false)
       // Sets the action that should occur if the destination table already exists.
       .setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).build();

   // Create a job ID so that we can safely retry.
   JobId jobId = JobId.of(UUID.randomUUID().toString());
   Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

   // Wait for the query to complete.
   queryJob = queryJob.waitFor();

   // Check for errors
   if (queryJob == null) {
     throw new RuntimeException("Job no longer exists");
   } else if (queryJob.getStatus().getError() != null) {
     // You can also look at queryJob.getStatus().getExecutionErrors() for all
     // errors, not just the latest one.
     throw new RuntimeException(queryJob.getStatus().getError().toString());
   }

After finishing the job, we can iterate over the result and insert the records into MongoDB:

// Get the results.
   TableResult result = queryJob.getQueryResults();

   int batchSize = 10000;
   ArrayList<FoodEvent> foodEventArrayList = new ArrayList<>();
   // Iterate over all the rows
   for (FieldValueList row : result.iterateAll()) {

     try {
       // Fetch the values
       String reportNumber = row.get("report_number").getStringValue();
       String reactions = row.get("reactions").getStringValue();
       String outcomes = row.get("outcomes").getStringValue();
       String brandName = row.get("products_brand_name").getStringValue();
       String industryCode = row.get("products_industry_code").getStringValue();


       FoodEvent foodEvent = FoodEvent.builder().reportNumber(reportNumber).reactions(reactions)
           .outcomes(outcomes).brandName(brandName).industryCode(industryCode).build();
       foodEventArrayList.add(foodEvent);

       // Bulk insert in batches
       if (foodEventArrayList.size() >= batchSize) {
         // add all the orders to db
         this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
         foodEventArrayList.clear();
       }
     } catch (Exception e) {
       log.error("Exception while parsing ({})", row, e);
     }

   }
   if (foodEventArrayList.size() > 0) {
     // add all the orders to db
     this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
     foodEventArrayList.clear();
   }

Step 5: Inserting into MongoDB

You can persist or save the documents into MongoDB either by loading them one by one or in several batches. Inserting the records in batches can improve the performance dramatically. So we insert them in batches as below:

public void bulkInsertEvents(List<FoodEvent> foodEventList) {
   try {
     BulkWriteResult bulkWriteResult =
         mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, FoodEvent.class)
             .insert(foodEventList).execute();
   } catch (BulkWriteException bwe) {
     List<Integer> failedEventIndexes = new ArrayList<>();
     for (BulkWriteError e : bwe.getWriteErrors()) {
       failedEventIndexes.add(e.getIndex());
       log.error("Insertion failed for index: {} with error: {}", e.getIndex(), e);
     }
   } catch (Exception e) {
     log.error("Bulk Insertion failed for {} events: {}", foodEventList.size(), foodEventList, e);
   }
 }

Conclusion: We were able to transfer the data from BigQuery to MongoDB using the above steps. But we found that BigQuery is quite fast at processing large sets of data, however retrieving large datasets was a bottleneck. It took a lot of time to paginate and insert into the DB. We played a lot with different page sizes and batch sizes to find the optimal parameters for us. And as our data size was not very huge (approx 15 lakh), we were fine with this approach. But if your dataset size is huge, you might want to try some other approach. Do comment if you have any suggestions or a totally new/better approach.

References:

https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

https://cloud.google.com/bigquery/quotas

https://stackoverflow.com/questions/41025753/bigquery-retrieval-times-slow

https://cloud.google.com/bigquery/docs/paging-results

https://github.com/googleapis/python-bigquery/issues/112


MongoDB
BigQuery

About Quinbay

Quinbay is a dynamic one stop technology company driven by the passion to disrupt technology today and define the future.
We private label and create digital future tech platforms for you.

Digitized . Automated . Intelligent