Connecting to S3 bucket with Sedona using Scala in Wherobots

Hi there,
I was just yesterday on call with staff members Will and Daniel who helped me to connect to my S3 bucket in Python. We ran into a problem with Scala.
This is the Python code which works:
from sedona.spark import SedonaContext
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from pyspark.sql import SparkSession
from sedona.sql.types import GeometryType
from pyspark.sql.functions import col
from pyspark.sql import functions as F
import sedona.sql.st_functions as st
from pyspark.sql.functions import expr

spark = SparkSession.builder
.appName(“SpatialQuery”)
.config(“spark.serializer”, KryoSerializer.getName)
.config(“spark.kryo.registrator”, SedonaKryoRegistrator.getName)
.config(“spark.hadoop.fs.s3a.<bucket_name>.agdm.aws.credentials.provider”, “org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider”)
.config(“spark.hadoop.fs.s3a.<bucket_name>.agdm.access.key”, “…”)
.config(“spark.hadoop.fs.s3a.<bucket_name>.agdm.secret.key”, “…”)
.getOrCreate()

SedonaRegistrator.registerAll(spark)
data_source = ‘s3://<bucket_name>/Parquet/’
output_path = ‘s3://<bucket_name>/ParquetResults/’

df_gt = spark.read.format(“parquet”).load(data_source + ‘geodetskatocka.parquet’)
df_kc = spark.read.format(“parquet”).load(data_source + ‘katcestica.parquet’)
df_ko = spark.read.format(“parquet”).load(data_source + ‘katopcina.parquet’)
df_kz = spark.read.format(“parquet”).load(data_source + ‘koristenjezemljista.parquet’)
df_mt = spark.read.format(“parquet”).load(data_source + ‘medjnatocka.parquet’)
df_z = spark.read.format(“parquet”).load(data_source + ‘zgrade.parquet’)

and this is what I have tried in Scala:
import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
import org.apache.sedona.spark.SedonaContext
import org.apache.sedona.sql.utils.Adapter
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.desc

val catalogName = “someName”
val bucketName = “<bucket_name>”
val parquetFolderPath = s"s3://<bucket_name>/"
val awsRegion = “my_region”
val awsAccessKeyId = “…”
val awsSecretAccessKey = “…”

val spark = SparkSession.builder()
.appName(“SedonaApp”).
config(“spark.sql.catalog.”+catalogName+“.type”, “hadoop”).
config(“spark.sql.catalog.”+catalogName, “org.apache.iceberg.spark.SparkCatalog”).
config(“spark.sql.catalog.”+catalogName+“.warehouse”, parquetFolderPath).
config(“spark.sql.catalog.”+catalogName+“.io-impl”, “org.apache.iceberg.aws.s3.S3FileIO”).
config(“spark.sql.catalog.”+catalogName+“.client.region”, awsRegion).
config(“spark.sql.catalog.”+catalogName+“.s3.access-key-id”, awsAccessKeyId).
config(“spark.sql.catalog.”+catalogName+“.s3.secret-access-key”, awsSecretAccessKey).
config(“spark.hadoop.fs.s3a.bucket.”+bucketName+“.aws.credentials.provider”, “org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider”).
config(“spark.hadoop.fs.s3a.bucket.”+bucketName+“.access.key”, awsAccessKeyId).
config(“spark.hadoop.fs.s3a.bucket.”+bucketName+“.secret.key”, awsSecretAccessKey).
getOrCreate()

val sedona = SedonaContext.create(spark)
val sc = sedona.sparkContext

val parquetDF = spark.read.parquet(parquetFolderPath + “/Parquet/zgrade.parquet”)
parquetDF.printSchema()
parquetDF.show()

but I am getting a 403 Forbidden, this is the stack tree I get:
java.nio.file.AccessDeniedException: s3://agdm/Parquet/zgrade.parquet: getFileStatus on s3://agdm/Parquet/zgrade.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: Z34NMFYEBM94BRB8; S3 Extended Request ID: YSabaCpLWy3CLLKrAaSzpp/o2AQYYCCf+M1QI1XA9czrM7EDFBt0DhUun5Z8uRDcYmJqaOreEDM=; Proxy: null), S3 Extended Request ID: YSabaCpLWy3CLLKrAaSzpp/o2AQYYCCf+M1QI1XA9czrM7EDFBt0DhUun5Z8uRDcYmJqaOreEDM=:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:756)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:393)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1423)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1841)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: Z34NMFYEBM94BRB8; S3 Extended Request ID: YSabaCpLWy3CLLKrAaSzpp/o2AQYYCCf+M1QI1XA9czrM7EDFBt0DhUun5Z8uRDcYmJqaOreEDM=; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
… 23 more

Does anyone know why does this happen as configuration is more or less the same as for Python, I am using the same access and secret access keys.

Hi @HrvojeB

Wherobots Scala notebook works a bit different from our Python notebook.

If you take a look at our Scala notebook example, the config must be set before you create Spark / SedonaContext.

As the example notebook shows, the way to set configs for Scala notebook is as follows:

%%init_spark
launcher.conf.set("spark.sql.catalog." + catalogName + ".type", "hadoop")
launcher.conf.set("spark.sql.catalog." + catalogName, "org.apache.iceberg.spark.SparkCatalog")
launcher.conf.set("spark.sql.catalog." + catalogName + ".warehouse", parquetFolderPath)
launcher.conf.set("spark.sql.catalog." + catalogName + ".io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
launcher.conf.set("spark.sql.catalog." + catalogName + ".client.region", awsRegion)
launcher.conf.set("spark.sql.catalog." + catalogName + ".s3.access-key-id", awsAccessKeyId)
launcher.conf.set("spark.sql.catalog." + catalogName + ".s3.secret-access-key", awsSecretAccessKey)
launcher.conf.set("spark.hadoop.fs.s3a.bucket." + bucketName + ".access.key", awsAccessKeyId)
launcher.conf.set("spark.hadoop.fs.s3a.bucket." + bucketName + ".secret.key", awsSecretAccessKey)

I have just tried this, this is what I ran;
%%init_spark
launcher.conf.set(“spark.sql.catalog.someCatalogName.type”, “hadoop”)
launcher.conf.set(“spark.sql.catalog.someCatalogName”, “org.apache.iceberg.spark.SparkCatalog”)
launcher.conf.set(“spark.sql.catalog.someCatalogName.warehouse”, “s3://agdm/”)
launcher.conf.set(“spark.sql.catalog.someCatalogName.io-impl”, “org.apache.iceberg.aws.s3.S3FileIO”)
launcher.conf.set(“spark.sql.catalog.someCatalogName.client.region”, “us-west-1”)
launcher.conf.set(“spark.sql.catalog.someCatalogName.s3.access-key-id”, “…”)
launcher.conf.set(“spark.sql.catalog.someCatalogName.s3.secret-access-key”, “…”)
launcher.conf.set(“spark.hadoop.fs.s3a.bucket.agdm.access.key”, “…”)
launcher.conf.set(“spark.hadoop.fs.s3a.bucket.agdm.secret.key”, “…”)
import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
import org.apache.sedona.spark.SedonaContext
import org.apache.sedona.sql.utils.Adapter
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.desc

val parquetFolderPath = s"s3://agdm/"
val sedona = SedonaContext.create(spark)
val sc = sedona.sparkContext

val parquetDF = spark.read.parquet(parquetFolderPath + “Parquet/zgrade.parquet”)
parquetDF.printSchema()
parquetDF.show()

but I am still getting 403 Forbidden.

You can try adding the following additional configuration to the %%init_spark cell:

launcher.conf.set("spark.hadoop.fs.s3a.bucket.agdm.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

This makes Spark use the access key you provided to read the S3 bucket named agdm.

@HrvojeB Please let us know if the new comment works

This worked! Thanks!