Oracle Cloud Infrastructure (OCI) Data Flow is a fully managed big data service that lets you run Apache Spark applications at any scale with almost no administration. Spark has become the leading big data processing framework and OCI Data Flow is the easiest way to run Spark in Oracle Cloud because there’s nothing for developers to install or manage.
Now perform your analytics on OCI DataFlow using Interactive Data Science Notebook:
OCI DataFlow is now coming with Interactive OCI Data Science Notebook. Now, users can seamlessly enable and start playing with their data using the Data Science Notebook. This Notebook experience leverages the power of Apache Spark. Here, We will see how easily an OCI DataFlow Environment can be created on OCI using Conda. We will be creating OCI Data Science session & Notebook and will access OCI Dataflow spark platform using Livy session. Submit fault-tolerant Spark jobs from the notebook using synchronous and asynchronous methods to retrieve the output.
OCI DataFlow with Interactive OCI Data Science have also introduced SparkMagic commands adding it's own flavours & upgrades. SparkMagic allows for interactive communication with Spark using Livy. Using the `%%spark` magic directive within a JupyterLab code cell.
The purpose of this document is to walk you through the setup required to access the OCI Data Flow Sessions through the Data Science Notebook Session. These Sessions allow you to run interactive Spark workloads on a long lasting Data Flow cluster through an Apache Livy integration.
Also, Once OCI Data Flow Spark Session is created, will go through some Sample codes for performing Spark Operations on OCI Object Storage & Autonomous DataWarehouse.
Features & Benefit:
◉ Use Interactive Jupyter Notebook with OCI Data Flow.
◉ Data Flow Sessions supports auto-scaling Data Flow cluster capabilities. User can enable auto-scaling while creating Spark - Livy session as well as once the session is created from the Notebook.
◉ Data Flow Sessions supports the use of conda environments as customizable Spark runtime environments.
◉ Import open source libraries & start using it after building Conda environments.
◉ Spark Delta Lake in OCI Data Flow & perform ACID transactions.
Use Interactive OCI Data Science Notebook with OCI Data Flow:
To use the OCI Data Science Notebook with OCI Data Flow, follow below steps:
◉ Create required buckets
◉ Create a bucket named dataflow-logs in your tenancy.
◉ Create a bucket named dataflow-warehouse in your tenancy.
◉ Create a dynamic group in a specific compartment:
ALL {resource.type='dataflowrun', resource.compartment.id='<compartment_id>'}
ALL {resource.type='datasciencenotebooksession', resource.compartment.id='<compartment_id>'}
Any {resource.type = 'datacatalogmetastore'}
◉ Create a policy to manage OCI resources from OCI data flow, Data Science:
ALLOW DYNAMIC-GROUP <df-dynamic-group> TO MANAGE objects IN TENANCY WHERE ANY
{target.bucket.name='<bucket_name>',
target.bucket.name='dataflow-logs,
target.bucket.name='dataflow-warehouse'
}
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE dataflow-family
in compartment '<your-compartment-name>'
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE data-catalog-metastores IN TENANCY
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO READ buckets IN TENANCY
ALLOW DYNAMIC-GROUP '<dcat-hive-group>' TO MANAGE object-family IN TENANCY WHERE ANY
{ target.bucket.name = '<bucket_name>',
target.bucket.name = '<managed-table-location-bucket>',
target.bucket.name = '<external-table-location-bucket>'
}
ALLOW DYNAMIC-GROUP '<ds-dynamic-group>' TO MANAGE objects IN TENANCY
WHERE ALL {target.bucket.name='ds-conda-env'}
ALLOW DYNAMIC-GROUP '<df-dynamic-group>' TO MANAGE objects IN TENANCY
WHERE ALL {target.bucket.name='ds-conda-env'}
◉ Create OCI Data Science Project & Session.
◉ Open New OCI Data Science Session. From File option, choose New Launcher and click on Terminal.
◉ Install and activate the pyspark32_p38_cpu_v1 conda environment from your terminal:
odsc conda install -s pyspark32_p38_cpu_v1
source activate /home/datascience/conda/pyspark32_p38_cpu_v1
◉ Once Conda is activated, Goto New Launcher Tab & click on Settings. Fill required information about object storage where Conda package will be uploaded & save it.
◉ Now, Publish Conda environment:
odsc conda publish -s pyspark3_2anddataflowv1_0
Note: Publishing will take some time. Once it is completed, you can observe Conda package is uploaded on the Object Storage bucket.
◉ Open Notebook using "PySpark and DataFlow" as kernel from new Launcher.
◉ Execute below commands to setup and create Data Flow Spark session using Livy Service:
1. Setup Authentication using ADS
import ads
ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
2. Load Extension
%load_ext dataflow.magics
3. Create OCI DataFlow Spark Session using Livy:
#Create OCI Dataflow Session using LIVY service through OCI Data SCience Notebook.
import json
command = {
"compartmentId": "ocid1.compartment.oc1..xxxxxxxxxxxxxx",
"displayName": "Demo_DataFlow_Spark_v1",
"sparkVersion": "3.2.1",
"driverShape": "VM.Standard.E3.Flex",
"executorShape": "VM.Standard.E3.Flex",
"driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
"executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
"numExecutors": 1,
"logsBucketUri": "<oci://bucket@namespace/>",
"archiveUri": "<oci://bucket@namespace/archive.zip>"
"configuration":{"spark.archives":"<oci://bucket@namespace/>#conda",
"spark.oracle.datasource.enabled":"true"}
}
command = f'\'{json.dumps(command)}\''
print("command",command)
#"configuration":{
# "spark.dynamicAllocation.enabled":"true",
# "spark.dynamicAllocation.shuffleTracking.enabled":"true",
# "spark.dynamicAllocation.minExecutors":"1",
# "spark.dynamicAllocation.maxExecutors":"4",
# "spark.dynamicAllocation.executorIdleTimeout":"60",
# "spark.dynamicAllocation.schedulerBacklogTimeout":"60",
# "spark.dataflow.dynamicAllocation.quotaPolicy":"min" }}'
%create_session -l python -c $command
OCI Data Flow Spark Session is created. Moving ahead will go through some Sample codes for performing Spark Operations on:
1. OCI Object Storage
2. OCI Autonomous DataWarehouse
◉ Now, You can Import dependent libraries in Session:
%%spark
#Import required libraries.
import json
import os
import sys
import datetime
import oci
import pyspark.sql
from pyspark.sql.functions import countDistinct
from delta.tables import *
◉ Perform Spark Read operation on Object Storage.
Read Object Storage file using spark.read from Livy Session:
<strong>%%spark -o df_Bronze_Insurance_Data</strong>
#Read Claim Insurance files from OCI Object Storage in Spark Dataframe.
df_Bronze_Insurance_Data = spark.read.format("csv").option("header", "true") \
.option("multiLine", "true").load("oci://test-demo@OSNamespace/insur_claim/claim.csv*")
print("df_RawZone_Data",df_Bronze_Insurance_Data)
df_Bronze_Insurance_Data.show(5)
◉ Perform Spark Write operation on Object Storage.
%%spark
df_Bronze_Insurance_Data.write.format("json").option("mode","overwrite").save("oci://test-demo@OSNamespace/insur_claim/claim_curated")
◉ Perform Read & Write Operation on Autonomous DataWarehouse:
◉ Load Data into ADW using Secret Vault for Wallet:
Copy below code as it is.
Reference: https://github.com/oracle-samples/oracle-dataflow-samples/tree/main/python/loadadw
%%spark
def get_authenticated_client(token_path, client, file_location=None, profile_name=None):
"""
Get an an authenticated OCI client.
Example: get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
"""
import oci
if not in_dataflow():
# We are running locally, use our API Key.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE
config = oci.config.from_file(file_location=file_location, profile_name=profile_name)
authenticated_client = client(config)
else:
# We are running in Data Flow, use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client
def get_password_from_secrets(token_path, password_ocid):
"""
Get a password from the OCI Secrets Service.
"""
import base64
import oci
secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
response = secrets_client.get_secret_bundle(password_ocid)
base64_secret_content = response.data.secret_bundle_content.content
base64_secret_bytes = base64_secret_content.encode("ascii")
base64_message_bytes = base64.b64decode(base64_secret_bytes)
secret_content = base64_message_bytes.decode("ascii")
return secret_content
def get_delegation_token_path(spark):
"""
Get the delegation token path when we're running in Data Flow.
"""
if not in_dataflow():
return None
token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
token_path = spark.sparkContext.getConf().get(token_key)
if not token_path:
raise Exception(f"{token_key} is not set")
return token_path
def get_temporary_directory():
if in_dataflow():
return "/opt/spark/work-dir/"
else:
import tempfile
return tempfile.gettempdir()
def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
if os.environ.get("HOME") == "/home/dataflow":
return True
return False
def download_wallet(spark, wallet_path):
"""
Download an ADW/ATP wallet file and prepare it for use in a Data Flow
application.
"""
import oci
import zipfile
# Get an object store client.
token_path = get_delegation_token_path(spark)
object_store_client = get_authenticated_client(
token_path, oci.object_storage.ObjectStorageClient
)
# Download the wallet file.
from urllib.parse import urlparse
parsed = urlparse(wallet_path)
bucket_name, namespace = parsed.netloc.split("@")
file_name = parsed.path[1:]
response = object_store_client.get_object(namespace, bucket_name, file_name)
temporary_directory = get_temporary_directory()
zip_file_path = os.path.join(temporary_directory, "wallet.zip")
with open(zip_file_path, "wb") as fd:
for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
fd.write(chunk)
# Extract everything locally.
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(temporary_directory)
# Distribute all wallet files.
contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
spark_context = spark.sparkContext
for file in contents:
spark_context.addFile(os.path.join(temporary_directory, file))
return temporary_directory
◉ Set below Parameters related to ADW Instance & Wallet.
%%spark
PASSWORD_SECRET_OCID = "ocid1.vaultsecret.oc1.phx.xxxxxxx"
TARGET_TABLE = "ADMIN.TB_NAME"
TNSNAME = "demolakehouseadw_medium"
USER = "admin"
WALLET_PATH = "oci://bucketname@osnamespace/Wallet_DemoLakeHouseADW.zip"
# Download and distribute our wallet file.
wallet_path = download_wallet(spark, WALLET_PATH)
adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNSNAME, wallet_path)
◉ Now, Get password using the secret service.
%%spark
# Get our password using the secret service.
print("Getting wallet password")
token_path = get_delegation_token_path(spark)
password = get_password_from_secrets(token_path, PASSWORD_SECRET_OCID)
print("Done getting wallet password")
# Save the results to the database.
print("Saving processed data to " + adw_url)
properties = {
"driver": "oracle.jdbc.driver.OracleDriver",
"oracle.net.tns_admin": TNSNAME,
"password": password,
"user": USER
}
◉ Read Sample Table from ADW.
%%spark
SOURCE_TABLE = "ADMIN.RETAILPOS"
df_RetailPOS_15min = spark.read.jdbc(url=adw_url, table=SOURCE_TABLE, properties=properties)
◉ Load above Dataframe into ADW.
%%spark
#Load into ADW:
TARGET_TABLE = "ADMIN.RETAILPOS_15MINUTES"
print("TARGET_TABLE : ",TARGET_TABLE)
# Write to ADW.
print("Write to ADW : ")
df_RetailPOS_15min.write.jdbc(url=adw_url, table=TARGET_TABLE, mode="Append", properties=properties)
print("Writing done to ADW : ")
Source: oracle.com
0 comments:
Post a Comment