• Nagesh Singh Chauhan

Apache Beam: Headstart for a beginner


Credits:https://datatonic.com/insights/apache-beam-first-london-meetup/


INTRODUCTION


Apache Beam is an evolution of the Dataflow model created by Google to process massive amounts of data. The name Beam (Batch + strEAM) comes from the idea of having a unified model for both batch and stream data processing. Programs written using Beam can be executed in different processing frameworks (via runners) using a set of different IOs.


On January 10, 2017, Beam got promoted as a Top-Level Apache Software Foundation project. It was an important milestone that validated the value of the project, the legitimacy of its community, and heralded its growing adoption. In the past year, Apache Beam has experienced tremendous momentum, with significant growth in both its community and feature set.


Why we actually need Apache beam when we already have Hadoop/Spark/Flink?


Well, there are many frameworks like Hadoop, Spark, Flink, Google Cloud Dataflow, etc that came into existence. But there has been no unified API that binds all these frameworks and data sources, and provide an abstraction to the application logic from big data ecosystem. Apache Beam framework provides an abstraction between your application logic and the big data ecosystem. To know about the comparison about Spark vs Beam check this.


Hence, there is no need to bother about the following aspects when you are writing your data processing application :

DataSource — Data source can be batches, micro-batches or streaming data

SDK — You may choose your SDK (Java, Python) that you are comfortable with to program your logic.

Runner — Once the application logic is written then you may choose one of the available runners (Apache Spark, Apache Flink, Google Cloud Dataflow, Apache Apex, Apache Gear pump (incubating) or Apache Samza) to run your application based on your requirements.

This is how, Beam lets you write your application logic once, and not mix and scramble the code with input specific parameters or runner specific parameters.


Before we start implementing our Beam application, we need to get aware of some core ideas that will be used later all the time. There are five main conceptions in Beam: Pipeline, PCollection, PTransform, ParDO, and DoFn.


Pipeline: A Pipeline encapsulates the workflow of your entire data processing tasks from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.


PCollection: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism


PTransform: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.


ParDo: ParDo is a Beam transform for generic parallel processing. The ParDo processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection.


DoFn: A DoFn applies your logic in each element in the input PCollection and lets you populate the elements of an output PCollection. To be included in your pipeline, it’s wrapped in a ParDo PTransform.


The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service. When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which executes your pipeline on managed resources in Google Cloud Platform. When using Java, you must specify your dependency on the Dataflow Runner in your pom.xml.


<dependency>
    <groupId>com.google.cloud.dataflow</groupId>
    <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
    <version>2.5.0</version>
</dependency>


Real time implementation:


Now, let's start by implementing Hospital Charges Data Analysis in the United States. You can download the data from here.


Dataset Description:

DRG Definition: The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and procedures furnished by the hospital during their stay.

Provider Id: The CMS Certification Number (CCN) assigned to the Medicare-certified hospital facility.

Provider Name: The name of the provider.

Provider Street Address: The provider’s street address.

Provider City: The city where the provider is located.

Provider State: The state where the provider is located.

Provider Zip Code: The provider’s zip code.

Provider HRR: The Hospital Referral Region (HRR) where the provider is located.

Total Discharges: The number of discharges billed by the provider for inpatient hospital services.

Average Covered Charges: The provider’s average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of the differences in hospital charge structures.

Average Total Payments: The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included in the average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits.

Average Medicare Payments: The average amount that Medicare pays to the provider for Medicare’s share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits.


We will do our analysis using Apache Beam’s Java BeamSql API and will execute the code in Google Dataflow runner to solve the below problems :

Problem 1: Find the amount of Average Covered Charges per state. Problem 2: Find the amount of Average Medicare Payments charges per state. Problem 3: Find out the total number of Discharges per state and for each disease.


To work with BeamSql API, we must also be aware of BeamRecord. BeamRecord is an immutable tuple-like type to represent one element in a PCollection. The fields are described with a BeamRecordType.


I’m assuming that Google Storage has been taken as primary data storage.


Step 1: Import required libraries.



package my.proj;
import java.io.Reader;
import java.io.Serializable;
import java.sql.Types;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.lang.Integer;
import java.nio.charset.StandardCharsets;
import java.lang.Double;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.extensions.sql.BeamSql.SimpleQueryTransform;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import com.google.api.services.storage.Storage;
import com.google.appengine.tools.cloudstorage.ListItem;
import com.google.appengine.tools.cloudstorage.ListOptions;
import com.google.appengine.tools.cloudstorage.ListResult;
import com.google.appengine.tools.cloudstorage.RetryParams;
import com.google.appengine.tools.util.Logging;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.bigtable.admin.v2.StorageType;
import com.google.cloud.storage.StorageOptions;
import com.pojo.ClassHospital;

Step 2: Specify runner options, which includes Project name, Staging location, Runner, etc all in Google Cloud Platform(GCP).


DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
  options.setProject("Hospital_Charges_Analysis"); 
  options.setStagingLocation("gs://GoogleStorageBucket/staging");
  options.setRunner(DataflowRunner.class);
  DataflowRunner.fromOptions(options);

Step 3: Create Pipeline object p, as this is the entry point of our program.

Pipeline p = Pipeline.create(options);

Step 4: Read the data that we have downloaded using apply() transformation.


PCollection<String> hosp_obj =p.apply(TextIO.read().from("gs://GoogleStorageBucket/input/hosptital_charges.csv"));

Step 5: Now we have to read each row and specify the datatypes. For that, we need to create a ParDo function in which we will implement DoFn.(Assumption: We have already created a class ClassHospital which consists of all the column names and their getter() and setter() methods)



PCollection<ClassHospital> pojo = hosp_obj.apply(ParDo.of(new DoFn<String, ClassHospital>() { private static final long serialVersionUID = 1L;
   @ProcessElement
   public void processElement(ProcessContext c) {
    String[] strArr = c.element().split(",");

    ClassHospital obj = new ClassHospital();
    obj.setDRGDefinition(strArr[0]);
    obj.setProviderId(strArr[1]);
    obj.setProviderName(Integer.parseInteger(strArr[2]));
    obj.setProviderStreetAddress(strArr[3]);
    obj.setProviderCity((strArr[4]);
    obj.setProviderState(strArr[5]);
    obj.setProviderZipCode(Integer.parseInteger(strArr[6]));
    obj.setHospitalReferralRegionDescription(strArr[7]);
    obj.setTotalDischarges(Integer.parseInteger(strArr[8]));
    obj.setAverageCoveredCharges(Double.parseDouble(strArr[9]));
    obj.setAverageTotalPayments(Double.parseDouble(strArr[10]));
    obj.setAverageMedicarePayments(Double.parseDouble(strArr[11]));
    c.output(obj);
     }
     }
  }));

Step 6: Next we will create two lists. One for storing column names and others for storing respective datatypes.



List<String> Col_Names = Arrays.asList("DRGDefinition", "ProviderId", "ProviderName", "ProviderStreetAddress","ProviderCity", "ProviderState", "ProviderZipCode","HospitalReferralRegionDescription", "TotalDischarges", "AverageCoveredCharges", "AverageTotalPayments", "AverageMedicarePayments");  

List<Integer> Col_Types = Arrays.asList(Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.DOUBLE, Types.INTEGER, Types.VARCHAR, Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.DOUBLE,Types.DOUBLE,Types.DOUBLE);

Step 7: Create BeamRecordSqlType objectwhich will be used for converting our data into BeamRecord.


final BeamRecordSqlType record_Type = BeamRecordSqlType.create(Col_Names, Col_Types);

Step 8: Convert the data into BeamRecord using ParDo function and set the coder for encoding each record.



PCollection<BeamRecord> record = pojos.apply(ParDo.of(new DoFn<ClassHospital, BeamRecord>() {
   private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
    BeamRecord br = new BeamRecord(record_Type, c.element().DRGDefinition, c.element().ProviderId, c.element().ProviderName,
      c.element().ProviderStreetAddress, c.element().ProviderCity, c.element().ProviderState, c.element().ProviderZipCode, c.element().HospitalReferralRegionDescription, c.element().TotalDischarges, c.element().AverageCoveredCharges, c.element().AverageTotalPayments, c.element.AverageMedicarePayments);
    c.output(br); 
    }
  })).setCoder(record_Type.getRecordCoder());

Step 9: Now apply triggers to BeamRecord.


When collecting and grouping data into windows, Beam uses triggers to determine when to emit the aggregated results of each window (referred to as a pane). If you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived and discards all subsequent data for that window.


PCollection<BeamRecord> record_trig= record.apply(Window.<BeamRecord>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(org.joda.time.Duration.standardMinutes(1)).discardingFiredPanes());

Step 10: We will now find the amount of Average Covered Charges per state.


PCollection<BeamRecord> result_1 = record_trig.apply(BeamSql.query(
  "SELECT ProviderState, AVG(AverageCoveredCharges) from PCOLLECTION GROUP BY ProviderState"));

Output:


Step 11: Now, find the amount of Average Medicare Payments charges per state.


PCollection<BeamRecord> result_2 = record_trig.apply(BeamSql.query(
  "SELECT ProviderState, AVG(AverageMedicarePayments) from PCOLLECTION GROUP BY ProviderState"));

Output:


Step 12: Next, We will find out the total number of Discharges per state and for each disease.


PCollection<BeamRecord> result_3 = record_trig.apply(BeamSql.query(
  "SELECT DRGDefinition, ProviderState, SUM(TotalDischarges) as Discharge from PCOLLECTION GROUP BY ProviderState, DRGDefinition"));

Output:


We can also sort our data.

PCollection<BeamRecord> result_4 = result_3.apply(BeamSql.query(
  "SELECT DRGDefinition, ProviderState, Discharge from PCOLLECTION ORDER BY Discharge"));

Output:


Step 13: We can also store our final pipeline data in Google storage, for that we need to convert our PCollection from BeamRecord to String.


PCollection<String> gs_output = result_4.apply(ParDo.of(new DoFn<BeamRecord, String>() {
     private static final long serialVersionUID = 1L;
@ProcessElement
     public void processElement(ProcessContext c) {
      c.output(c.element().toString());
      System.out.println(c.element().toString());
     }
    }));

Step 14: Finally writing our final PCollection String into Google Storage.


gs_output.apply(TextIO.write().to("gs://GoogleStorageBucket/output/result_4"));

Hope this helps and Thanks for reading !!!

Happy Learning !!!

18 views0 comments

Recent Posts

See All